blob: 84e31a26756e065f27c60fff8fbb684380ee7b25 [file] [log] [blame]
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/envconfig"
42 "google.golang.org/grpc/internal/grpcsync"
43 "google.golang.org/grpc/internal/transport"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/resolver"
46 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
47 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
48 "google.golang.org/grpc/serviceconfig"
49 "google.golang.org/grpc/status"
50)
51
52const (
53 // minimum time to give a connection to complete
54 minConnectTimeout = 20 * time.Second
55 // must match grpclbName in grpclb/grpclb.go
56 grpclbName = "grpclb"
57)
58
59var (
60 // ErrClientConnClosing indicates that the operation is illegal because
61 // the ClientConn is closing.
62 //
63 // Deprecated: this error should not be relied upon by users; use the status
64 // code of Canceled instead.
65 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
66 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
67 errConnDrain = errors.New("grpc: the connection is drained")
68 // errConnClosing indicates that the connection is closing.
69 errConnClosing = errors.New("grpc: the connection is closing")
70 // errBalancerClosed indicates that the balancer is closed.
71 errBalancerClosed = errors.New("grpc: balancer is closed")
72 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
73 // service config.
74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
75)
76
77// The following errors are returned from Dial and DialContext
78var (
79 // errNoTransportSecurity indicates that there is no transport security
80 // being set for ClientConn. Users should either set one or explicitly
81 // call WithInsecure DialOption to disable security.
82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
83 // errTransportCredsAndBundle indicates that creds bundle is used together
84 // with other individual Transport Credentials.
85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
86 // errTransportCredentialsMissing indicates that users want to transmit security
87 // information (e.g., OAuth2 token) which requires secure connection on an insecure
88 // connection.
89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
90 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
91 // and grpc.WithInsecure() are both called for a connection.
92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
93)
94
95const (
96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97 defaultClientMaxSendMessageSize = math.MaxInt32
98 // http2IOBufSize specifies the buffer size for sending frames.
99 defaultWriteBufSize = 32 * 1024
100 defaultReadBufSize = 32 * 1024
101)
102
103// Dial creates a client connection to the given target.
104func Dial(target string, opts ...DialOption) (*ClientConn, error) {
105 return DialContext(context.Background(), target, opts...)
106}
107
108// DialContext creates a client connection to the given target. By default, it's
109// a non-blocking dial (the function won't wait for connections to be
110// established, and connecting happens in the background). To make it a blocking
111// dial, use WithBlock() dial option.
112//
113// In the non-blocking case, the ctx does not act against the connection. It
114// only controls the setup steps.
115//
116// In the blocking case, ctx can be used to cancel or expire the pending
117// connection. Once this function returns, the cancellation and expiration of
118// ctx will be noop. Users should call ClientConn.Close to terminate all the
119// pending operations after this function returns.
120//
121// The target name syntax is defined in
122// https://github.com/grpc/grpc/blob/master/doc/naming.md.
123// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
124func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
125 cc := &ClientConn{
126 target: target,
127 csMgr: &connectivityStateManager{},
128 conns: make(map[*addrConn]struct{}),
129 dopts: defaultDialOptions(),
130 blockingpicker: newPickerWrapper(),
131 czData: new(channelzData),
132 firstResolveEvent: grpcsync.NewEvent(),
133 }
134 cc.retryThrottler.Store((*retryThrottler)(nil))
135 cc.ctx, cc.cancel = context.WithCancel(context.Background())
136
137 for _, opt := range opts {
138 opt.apply(&cc.dopts)
139 }
140
141 chainUnaryClientInterceptors(cc)
142 chainStreamClientInterceptors(cc)
143
144 defer func() {
145 if err != nil {
146 cc.Close()
147 }
148 }()
149
150 if channelz.IsOn() {
151 if cc.dopts.channelzParentID != 0 {
152 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
153 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
154 Desc: "Channel Created",
155 Severity: channelz.CtINFO,
156 Parent: &channelz.TraceEventDesc{
157 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
158 Severity: channelz.CtINFO,
159 },
160 })
161 } else {
162 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
163 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
164 Desc: "Channel Created",
165 Severity: channelz.CtINFO,
166 })
167 }
168 cc.csMgr.channelzID = cc.channelzID
169 }
170
171 if !cc.dopts.insecure {
172 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
173 return nil, errNoTransportSecurity
174 }
175 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
176 return nil, errTransportCredsAndBundle
177 }
178 } else {
179 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
180 return nil, errCredentialsConflict
181 }
182 for _, cd := range cc.dopts.copts.PerRPCCredentials {
183 if cd.RequireTransportSecurity() {
184 return nil, errTransportCredentialsMissing
185 }
186 }
187 }
188
189 if cc.dopts.defaultServiceConfigRawJSON != nil {
190 sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
191 if err != nil {
192 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
193 }
194 cc.dopts.defaultServiceConfig = sc
195 }
196 cc.mkp = cc.dopts.copts.KeepaliveParams
197
198 if cc.dopts.copts.Dialer == nil {
199 cc.dopts.copts.Dialer = newProxyDialer(
200 func(ctx context.Context, addr string) (net.Conn, error) {
201 network, addr := parseDialTarget(addr)
202 return (&net.Dialer{}).DialContext(ctx, network, addr)
203 },
204 )
205 }
206
207 if cc.dopts.copts.UserAgent != "" {
208 cc.dopts.copts.UserAgent += " " + grpcUA
209 } else {
210 cc.dopts.copts.UserAgent = grpcUA
211 }
212
213 if cc.dopts.timeout > 0 {
214 var cancel context.CancelFunc
215 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
216 defer cancel()
217 }
218 defer func() {
219 select {
220 case <-ctx.Done():
221 conn, err = nil, ctx.Err()
222 default:
223 }
224 }()
225
226 scSet := false
227 if cc.dopts.scChan != nil {
228 // Try to get an initial service config.
229 select {
230 case sc, ok := <-cc.dopts.scChan:
231 if ok {
232 cc.sc = &sc
233 scSet = true
234 }
235 default:
236 }
237 }
238 if cc.dopts.bs == nil {
239 cc.dopts.bs = backoff.Exponential{
240 MaxDelay: DefaultBackoffConfig.MaxDelay,
241 }
242 }
243 if cc.dopts.resolverBuilder == nil {
244 // Only try to parse target when resolver builder is not already set.
245 cc.parsedTarget = parseTarget(cc.target)
246 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
247 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
248 if cc.dopts.resolverBuilder == nil {
249 // If resolver builder is still nil, the parsed target's scheme is
250 // not registered. Fallback to default resolver and set Endpoint to
251 // the original target.
252 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
253 cc.parsedTarget = resolver.Target{
254 Scheme: resolver.GetDefaultScheme(),
255 Endpoint: target,
256 }
257 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
258 }
259 } else {
260 cc.parsedTarget = resolver.Target{Endpoint: target}
261 }
262 creds := cc.dopts.copts.TransportCredentials
263 if creds != nil && creds.Info().ServerName != "" {
264 cc.authority = creds.Info().ServerName
265 } else if cc.dopts.insecure && cc.dopts.authority != "" {
266 cc.authority = cc.dopts.authority
267 } else {
268 // Use endpoint from "scheme://authority/endpoint" as the default
269 // authority for ClientConn.
270 cc.authority = cc.parsedTarget.Endpoint
271 }
272
273 if cc.dopts.scChan != nil && !scSet {
274 // Blocking wait for the initial service config.
275 select {
276 case sc, ok := <-cc.dopts.scChan:
277 if ok {
278 cc.sc = &sc
279 }
280 case <-ctx.Done():
281 return nil, ctx.Err()
282 }
283 }
284 if cc.dopts.scChan != nil {
285 go cc.scWatcher()
286 }
287
288 var credsClone credentials.TransportCredentials
289 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
290 credsClone = creds.Clone()
291 }
292 cc.balancerBuildOpts = balancer.BuildOptions{
293 DialCreds: credsClone,
294 CredsBundle: cc.dopts.copts.CredsBundle,
295 Dialer: cc.dopts.copts.Dialer,
296 ChannelzParentID: cc.channelzID,
297 Target: cc.parsedTarget,
298 }
299
300 // Build the resolver.
301 rWrapper, err := newCCResolverWrapper(cc)
302 if err != nil {
303 return nil, fmt.Errorf("failed to build resolver: %v", err)
304 }
305
306 cc.mu.Lock()
307 cc.resolverWrapper = rWrapper
308 cc.mu.Unlock()
309 // A blocking dial blocks until the clientConn is ready.
310 if cc.dopts.block {
311 for {
312 s := cc.GetState()
313 if s == connectivity.Ready {
314 break
315 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
316 if err = cc.blockingpicker.connectionError(); err != nil {
317 terr, ok := err.(interface {
318 Temporary() bool
319 })
320 if ok && !terr.Temporary() {
321 return nil, err
322 }
323 }
324 }
325 if !cc.WaitForStateChange(ctx, s) {
326 // ctx got timeout or canceled.
327 return nil, ctx.Err()
328 }
329 }
330 }
331
332 return cc, nil
333}
334
335// chainUnaryClientInterceptors chains all unary client interceptors into one.
336func chainUnaryClientInterceptors(cc *ClientConn) {
337 interceptors := cc.dopts.chainUnaryInts
338 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
339 // be executed before any other chained interceptors.
340 if cc.dopts.unaryInt != nil {
341 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
342 }
343 var chainedInt UnaryClientInterceptor
344 if len(interceptors) == 0 {
345 chainedInt = nil
346 } else if len(interceptors) == 1 {
347 chainedInt = interceptors[0]
348 } else {
349 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
350 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
351 }
352 }
353 cc.dopts.unaryInt = chainedInt
354}
355
356// getChainUnaryInvoker recursively generate the chained unary invoker.
357func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
358 if curr == len(interceptors)-1 {
359 return finalInvoker
360 }
361 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
362 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
363 }
364}
365
366// chainStreamClientInterceptors chains all stream client interceptors into one.
367func chainStreamClientInterceptors(cc *ClientConn) {
368 interceptors := cc.dopts.chainStreamInts
369 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
370 // be executed before any other chained interceptors.
371 if cc.dopts.streamInt != nil {
372 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
373 }
374 var chainedInt StreamClientInterceptor
375 if len(interceptors) == 0 {
376 chainedInt = nil
377 } else if len(interceptors) == 1 {
378 chainedInt = interceptors[0]
379 } else {
380 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
381 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
382 }
383 }
384 cc.dopts.streamInt = chainedInt
385}
386
387// getChainStreamer recursively generate the chained client stream constructor.
388func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
389 if curr == len(interceptors)-1 {
390 return finalStreamer
391 }
392 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
393 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
394 }
395}
396
397// connectivityStateManager keeps the connectivity.State of ClientConn.
398// This struct will eventually be exported so the balancers can access it.
399type connectivityStateManager struct {
400 mu sync.Mutex
401 state connectivity.State
402 notifyChan chan struct{}
403 channelzID int64
404}
405
406// updateState updates the connectivity.State of ClientConn.
407// If there's a change it notifies goroutines waiting on state change to
408// happen.
409func (csm *connectivityStateManager) updateState(state connectivity.State) {
410 csm.mu.Lock()
411 defer csm.mu.Unlock()
412 if csm.state == connectivity.Shutdown {
413 return
414 }
415 if csm.state == state {
416 return
417 }
418 csm.state = state
419 if channelz.IsOn() {
420 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
421 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
422 Severity: channelz.CtINFO,
423 })
424 }
425 if csm.notifyChan != nil {
426 // There are other goroutines waiting on this channel.
427 close(csm.notifyChan)
428 csm.notifyChan = nil
429 }
430}
431
432func (csm *connectivityStateManager) getState() connectivity.State {
433 csm.mu.Lock()
434 defer csm.mu.Unlock()
435 return csm.state
436}
437
438func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
439 csm.mu.Lock()
440 defer csm.mu.Unlock()
441 if csm.notifyChan == nil {
442 csm.notifyChan = make(chan struct{})
443 }
444 return csm.notifyChan
445}
446
447// ClientConn represents a client connection to an RPC server.
448type ClientConn struct {
449 ctx context.Context
450 cancel context.CancelFunc
451
452 target string
453 parsedTarget resolver.Target
454 authority string
455 dopts dialOptions
456 csMgr *connectivityStateManager
457
458 balancerBuildOpts balancer.BuildOptions
459 blockingpicker *pickerWrapper
460
461 mu sync.RWMutex
462 resolverWrapper *ccResolverWrapper
463 sc *ServiceConfig
464 conns map[*addrConn]struct{}
465 // Keepalive parameter can be updated if a GoAway is received.
466 mkp keepalive.ClientParameters
467 curBalancerName string
468 balancerWrapper *ccBalancerWrapper
469 retryThrottler atomic.Value
470
471 firstResolveEvent *grpcsync.Event
472
473 channelzID int64 // channelz unique identification number
474 czData *channelzData
475}
476
477// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
478// ctx expires. A true value is returned in former case and false in latter.
479// This is an EXPERIMENTAL API.
480func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
481 ch := cc.csMgr.getNotifyChan()
482 if cc.csMgr.getState() != sourceState {
483 return true
484 }
485 select {
486 case <-ctx.Done():
487 return false
488 case <-ch:
489 return true
490 }
491}
492
493// GetState returns the connectivity.State of ClientConn.
494// This is an EXPERIMENTAL API.
495func (cc *ClientConn) GetState() connectivity.State {
496 return cc.csMgr.getState()
497}
498
499func (cc *ClientConn) scWatcher() {
500 for {
501 select {
502 case sc, ok := <-cc.dopts.scChan:
503 if !ok {
504 return
505 }
506 cc.mu.Lock()
507 // TODO: load balance policy runtime change is ignored.
508 // We may revisit this decision in the future.
509 cc.sc = &sc
510 cc.mu.Unlock()
511 case <-cc.ctx.Done():
512 return
513 }
514 }
515}
516
517// waitForResolvedAddrs blocks until the resolver has provided addresses or the
518// context expires. Returns nil unless the context expires first; otherwise
519// returns a status error based on the context.
520func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
521 // This is on the RPC path, so we use a fast path to avoid the
522 // more-expensive "select" below after the resolver has returned once.
523 if cc.firstResolveEvent.HasFired() {
524 return nil
525 }
526 select {
527 case <-cc.firstResolveEvent.Done():
528 return nil
529 case <-ctx.Done():
530 return status.FromContextError(ctx.Err()).Err()
531 case <-cc.ctx.Done():
532 return ErrClientConnClosing
533 }
534}
535
536func (cc *ClientConn) updateResolverState(s resolver.State) error {
537 cc.mu.Lock()
538 defer cc.mu.Unlock()
539 // Check if the ClientConn is already closed. Some fields (e.g.
540 // balancerWrapper) are set to nil when closing the ClientConn, and could
541 // cause nil pointer panic if we don't have this check.
542 if cc.conns == nil {
543 return nil
544 }
545
546 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
547 if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
548 cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
549 }
550 } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
551 cc.applyServiceConfig(sc)
552 }
553
554 var balCfg serviceconfig.LoadBalancingConfig
555 if cc.dopts.balancerBuilder == nil {
556 // Only look at balancer types and switch balancer if balancer dial
557 // option is not set.
558 var newBalancerName string
559 if cc.sc != nil && cc.sc.lbConfig != nil {
560 newBalancerName = cc.sc.lbConfig.name
561 balCfg = cc.sc.lbConfig.cfg
562 } else {
563 var isGRPCLB bool
564 for _, a := range s.Addresses {
565 if a.Type == resolver.GRPCLB {
566 isGRPCLB = true
567 break
568 }
569 }
570 if isGRPCLB {
571 newBalancerName = grpclbName
572 } else if cc.sc != nil && cc.sc.LB != nil {
573 newBalancerName = *cc.sc.LB
574 } else {
575 newBalancerName = PickFirstBalancerName
576 }
577 }
578 cc.switchBalancer(newBalancerName)
579 } else if cc.balancerWrapper == nil {
580 // Balancer dial option was set, and this is the first time handling
581 // resolved addresses. Build a balancer with dopts.balancerBuilder.
582 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
583 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
584 }
585
586 cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
587 return nil
588}
589
590// switchBalancer starts the switching from current balancer to the balancer
591// with the given name.
592//
593// It will NOT send the current address list to the new balancer. If needed,
594// caller of this function should send address list to the new balancer after
595// this function returns.
596//
597// Caller must hold cc.mu.
598func (cc *ClientConn) switchBalancer(name string) {
599 if strings.EqualFold(cc.curBalancerName, name) {
600 return
601 }
602
603 grpclog.Infof("ClientConn switching balancer to %q", name)
604 if cc.dopts.balancerBuilder != nil {
605 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
606 return
607 }
608 if cc.balancerWrapper != nil {
609 cc.balancerWrapper.close()
610 }
611
612 builder := balancer.Get(name)
613 if channelz.IsOn() {
614 if builder == nil {
615 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
616 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
617 Severity: channelz.CtWarning,
618 })
619 } else {
620 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
621 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
622 Severity: channelz.CtINFO,
623 })
624 }
625 }
626 if builder == nil {
627 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
628 builder = newPickfirstBuilder()
629 }
630
631 cc.curBalancerName = builder.Name()
632 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
633}
634
635func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
636 cc.mu.Lock()
637 if cc.conns == nil {
638 cc.mu.Unlock()
639 return
640 }
641 // TODO(bar switching) send updates to all balancer wrappers when balancer
642 // gracefully switching is supported.
643 cc.balancerWrapper.handleSubConnStateChange(sc, s)
644 cc.mu.Unlock()
645}
646
647// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
648//
649// Caller needs to make sure len(addrs) > 0.
650func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
651 ac := &addrConn{
652 cc: cc,
653 addrs: addrs,
654 scopts: opts,
655 dopts: cc.dopts,
656 czData: new(channelzData),
657 resetBackoff: make(chan struct{}),
658 }
659 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
660 // Track ac in cc. This needs to be done before any getTransport(...) is called.
661 cc.mu.Lock()
662 if cc.conns == nil {
663 cc.mu.Unlock()
664 return nil, ErrClientConnClosing
665 }
666 if channelz.IsOn() {
667 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
668 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
669 Desc: "Subchannel Created",
670 Severity: channelz.CtINFO,
671 Parent: &channelz.TraceEventDesc{
672 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
673 Severity: channelz.CtINFO,
674 },
675 })
676 }
677 cc.conns[ac] = struct{}{}
678 cc.mu.Unlock()
679 return ac, nil
680}
681
682// removeAddrConn removes the addrConn in the subConn from clientConn.
683// It also tears down the ac with the given error.
684func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
685 cc.mu.Lock()
686 if cc.conns == nil {
687 cc.mu.Unlock()
688 return
689 }
690 delete(cc.conns, ac)
691 cc.mu.Unlock()
692 ac.tearDown(err)
693}
694
695func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
696 return &channelz.ChannelInternalMetric{
697 State: cc.GetState(),
698 Target: cc.target,
699 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
700 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
701 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
702 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
703 }
704}
705
706// Target returns the target string of the ClientConn.
707// This is an EXPERIMENTAL API.
708func (cc *ClientConn) Target() string {
709 return cc.target
710}
711
712func (cc *ClientConn) incrCallsStarted() {
713 atomic.AddInt64(&cc.czData.callsStarted, 1)
714 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
715}
716
717func (cc *ClientConn) incrCallsSucceeded() {
718 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
719}
720
721func (cc *ClientConn) incrCallsFailed() {
722 atomic.AddInt64(&cc.czData.callsFailed, 1)
723}
724
725// connect starts creating a transport.
726// It does nothing if the ac is not IDLE.
727// TODO(bar) Move this to the addrConn section.
728func (ac *addrConn) connect() error {
729 ac.mu.Lock()
730 if ac.state == connectivity.Shutdown {
731 ac.mu.Unlock()
732 return errConnClosing
733 }
734 if ac.state != connectivity.Idle {
735 ac.mu.Unlock()
736 return nil
737 }
738 // Update connectivity state within the lock to prevent subsequent or
739 // concurrent calls from resetting the transport more than once.
740 ac.updateConnectivityState(connectivity.Connecting)
741 ac.mu.Unlock()
742
743 // Start a goroutine connecting to the server asynchronously.
744 go ac.resetTransport()
745 return nil
746}
747
748// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
749//
750// If ac is Connecting, it returns false. The caller should tear down the ac and
751// create a new one. Note that the backoff will be reset when this happens.
752//
753// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
754// addresses will be picked up by retry in the next iteration after backoff.
755//
756// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
757//
758// If ac is Ready, it checks whether current connected address of ac is in the
759// new addrs list.
760// - If true, it updates ac.addrs and returns true. The ac will keep using
761// the existing connection.
762// - If false, it does nothing and returns false.
763func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
764 ac.mu.Lock()
765 defer ac.mu.Unlock()
766 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
767 if ac.state == connectivity.Shutdown ||
768 ac.state == connectivity.TransientFailure ||
769 ac.state == connectivity.Idle {
770 ac.addrs = addrs
771 return true
772 }
773
774 if ac.state == connectivity.Connecting {
775 return false
776 }
777
778 // ac.state is Ready, try to find the connected address.
779 var curAddrFound bool
780 for _, a := range addrs {
781 if reflect.DeepEqual(ac.curAddr, a) {
782 curAddrFound = true
783 break
784 }
785 }
786 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
787 if curAddrFound {
788 ac.addrs = addrs
789 }
790
791 return curAddrFound
792}
793
794// GetMethodConfig gets the method config of the input method.
795// If there's an exact match for input method (i.e. /service/method), we return
796// the corresponding MethodConfig.
797// If there isn't an exact match for the input method, we look for the default config
798// under the service (i.e /service/). If there is a default MethodConfig for
799// the service, we return it.
800// Otherwise, we return an empty MethodConfig.
801func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
802 // TODO: Avoid the locking here.
803 cc.mu.RLock()
804 defer cc.mu.RUnlock()
805 if cc.sc == nil {
806 return MethodConfig{}
807 }
808 m, ok := cc.sc.Methods[method]
809 if !ok {
810 i := strings.LastIndex(method, "/")
811 m = cc.sc.Methods[method[:i+1]]
812 }
813 return m
814}
815
816func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
817 cc.mu.RLock()
818 defer cc.mu.RUnlock()
819 if cc.sc == nil {
820 return nil
821 }
822 return cc.sc.healthCheckConfig
823}
824
825func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
826 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
827 FullMethodName: method,
828 })
829 if err != nil {
830 return nil, nil, toRPCErr(err)
831 }
832 return t, done, nil
833}
834
835func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
836 if sc == nil {
837 // should never reach here.
838 return fmt.Errorf("got nil pointer for service config")
839 }
840 cc.sc = sc
841
842 if cc.sc.retryThrottling != nil {
843 newThrottler := &retryThrottler{
844 tokens: cc.sc.retryThrottling.MaxTokens,
845 max: cc.sc.retryThrottling.MaxTokens,
846 thresh: cc.sc.retryThrottling.MaxTokens / 2,
847 ratio: cc.sc.retryThrottling.TokenRatio,
848 }
849 cc.retryThrottler.Store(newThrottler)
850 } else {
851 cc.retryThrottler.Store((*retryThrottler)(nil))
852 }
853
854 return nil
855}
856
857func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
858 cc.mu.RLock()
859 r := cc.resolverWrapper
860 cc.mu.RUnlock()
861 if r == nil {
862 return
863 }
864 go r.resolveNow(o)
865}
866
867// ResetConnectBackoff wakes up all subchannels in transient failure and causes
868// them to attempt another connection immediately. It also resets the backoff
869// times used for subsequent attempts regardless of the current state.
870//
871// In general, this function should not be used. Typical service or network
872// outages result in a reasonable client reconnection strategy by default.
873// However, if a previously unavailable network becomes available, this may be
874// used to trigger an immediate reconnect.
875//
876// This API is EXPERIMENTAL.
877func (cc *ClientConn) ResetConnectBackoff() {
878 cc.mu.Lock()
879 defer cc.mu.Unlock()
880 for ac := range cc.conns {
881 ac.resetConnectBackoff()
882 }
883}
884
885// Close tears down the ClientConn and all underlying connections.
886func (cc *ClientConn) Close() error {
887 defer cc.cancel()
888
889 cc.mu.Lock()
890 if cc.conns == nil {
891 cc.mu.Unlock()
892 return ErrClientConnClosing
893 }
894 conns := cc.conns
895 cc.conns = nil
896 cc.csMgr.updateState(connectivity.Shutdown)
897
898 rWrapper := cc.resolverWrapper
899 cc.resolverWrapper = nil
900 bWrapper := cc.balancerWrapper
901 cc.balancerWrapper = nil
902 cc.mu.Unlock()
903
904 cc.blockingpicker.close()
905
906 if rWrapper != nil {
907 rWrapper.close()
908 }
909 if bWrapper != nil {
910 bWrapper.close()
911 }
912
913 for ac := range conns {
914 ac.tearDown(ErrClientConnClosing)
915 }
916 if channelz.IsOn() {
917 ted := &channelz.TraceEventDesc{
918 Desc: "Channel Deleted",
919 Severity: channelz.CtINFO,
920 }
921 if cc.dopts.channelzParentID != 0 {
922 ted.Parent = &channelz.TraceEventDesc{
923 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
924 Severity: channelz.CtINFO,
925 }
926 }
927 channelz.AddTraceEvent(cc.channelzID, ted)
928 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
929 // the entity being deleted, and thus prevent it from being deleted right away.
930 channelz.RemoveEntry(cc.channelzID)
931 }
932 return nil
933}
934
935// addrConn is a network connection to a given address.
936type addrConn struct {
937 ctx context.Context
938 cancel context.CancelFunc
939
940 cc *ClientConn
941 dopts dialOptions
942 acbw balancer.SubConn
943 scopts balancer.NewSubConnOptions
944
945 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
946 // health checking may require server to report healthy to set ac to READY), and is reset
947 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
948 // is received, transport is closed, ac has been torn down).
949 transport transport.ClientTransport // The current transport.
950
951 mu sync.Mutex
952 curAddr resolver.Address // The current address.
953 addrs []resolver.Address // All addresses that the resolver resolved to.
954
955 // Use updateConnectivityState for updating addrConn's connectivity state.
956 state connectivity.State
957
958 backoffIdx int // Needs to be stateful for resetConnectBackoff.
959 resetBackoff chan struct{}
960
961 channelzID int64 // channelz unique identification number.
962 czData *channelzData
963}
964
965// Note: this requires a lock on ac.mu.
966func (ac *addrConn) updateConnectivityState(s connectivity.State) {
967 if ac.state == s {
968 return
969 }
970
971 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
972 ac.state = s
973 if channelz.IsOn() {
974 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
975 Desc: updateMsg,
976 Severity: channelz.CtINFO,
977 })
978 }
979 ac.cc.handleSubConnStateChange(ac.acbw, s)
980}
981
982// adjustParams updates parameters used to create transports upon
983// receiving a GoAway.
984func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
985 switch r {
986 case transport.GoAwayTooManyPings:
987 v := 2 * ac.dopts.copts.KeepaliveParams.Time
988 ac.cc.mu.Lock()
989 if v > ac.cc.mkp.Time {
990 ac.cc.mkp.Time = v
991 }
992 ac.cc.mu.Unlock()
993 }
994}
995
996func (ac *addrConn) resetTransport() {
997 for i := 0; ; i++ {
998 if i > 0 {
999 ac.cc.resolveNow(resolver.ResolveNowOption{})
1000 }
1001
1002 ac.mu.Lock()
1003 if ac.state == connectivity.Shutdown {
1004 ac.mu.Unlock()
1005 return
1006 }
1007
1008 addrs := ac.addrs
1009 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1010 // This will be the duration that dial gets to finish.
1011 dialDuration := minConnectTimeout
1012 if ac.dopts.minConnectTimeout != nil {
1013 dialDuration = ac.dopts.minConnectTimeout()
1014 }
1015
1016 if dialDuration < backoffFor {
1017 // Give dial more time as we keep failing to connect.
1018 dialDuration = backoffFor
1019 }
1020 // We can potentially spend all the time trying the first address, and
1021 // if the server accepts the connection and then hangs, the following
1022 // addresses will never be tried.
1023 //
1024 // The spec doesn't mention what should be done for multiple addresses.
1025 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1026 connectDeadline := time.Now().Add(dialDuration)
1027
1028 ac.updateConnectivityState(connectivity.Connecting)
1029 ac.transport = nil
1030 ac.mu.Unlock()
1031
1032 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1033 if err != nil {
1034 // After exhausting all addresses, the addrConn enters
1035 // TRANSIENT_FAILURE.
1036 ac.mu.Lock()
1037 if ac.state == connectivity.Shutdown {
1038 ac.mu.Unlock()
1039 return
1040 }
1041 ac.updateConnectivityState(connectivity.TransientFailure)
1042
1043 // Backoff.
1044 b := ac.resetBackoff
1045 ac.mu.Unlock()
1046
1047 timer := time.NewTimer(backoffFor)
1048 select {
1049 case <-timer.C:
1050 ac.mu.Lock()
1051 ac.backoffIdx++
1052 ac.mu.Unlock()
1053 case <-b:
1054 timer.Stop()
1055 case <-ac.ctx.Done():
1056 timer.Stop()
1057 return
1058 }
1059 continue
1060 }
1061
1062 ac.mu.Lock()
1063 if ac.state == connectivity.Shutdown {
1064 newTr.Close()
1065 ac.mu.Unlock()
1066 return
1067 }
1068 ac.curAddr = addr
1069 ac.transport = newTr
1070 ac.backoffIdx = 0
1071
1072 hctx, hcancel := context.WithCancel(ac.ctx)
1073 ac.startHealthCheck(hctx)
1074 ac.mu.Unlock()
1075
1076 // Block until the created transport is down. And when this happens,
1077 // we restart from the top of the addr list.
1078 <-reconnect.Done()
1079 hcancel()
1080
1081 // Need to reconnect after a READY, the addrConn enters
1082 // TRANSIENT_FAILURE.
1083 //
1084 // This will set addrConn to TRANSIENT_FAILURE for a very short period
1085 // of time, and turns CONNECTING. It seems reasonable to skip this, but
1086 // READY-CONNECTING is not a valid transition.
1087 ac.mu.Lock()
1088 if ac.state == connectivity.Shutdown {
1089 ac.mu.Unlock()
1090 return
1091 }
1092 ac.updateConnectivityState(connectivity.TransientFailure)
1093 ac.mu.Unlock()
1094 }
1095}
1096
1097// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1098// first successful one. It returns the transport, the address and a Event in
1099// the successful case. The Event fires when the returned transport disconnects.
1100func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1101 for _, addr := range addrs {
1102 ac.mu.Lock()
1103 if ac.state == connectivity.Shutdown {
1104 ac.mu.Unlock()
1105 return nil, resolver.Address{}, nil, errConnClosing
1106 }
1107
1108 ac.cc.mu.RLock()
1109 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1110 ac.cc.mu.RUnlock()
1111
1112 copts := ac.dopts.copts
1113 if ac.scopts.CredsBundle != nil {
1114 copts.CredsBundle = ac.scopts.CredsBundle
1115 }
1116 ac.mu.Unlock()
1117
1118 if channelz.IsOn() {
1119 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1120 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1121 Severity: channelz.CtINFO,
1122 })
1123 }
1124
1125 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1126 if err == nil {
1127 return newTr, addr, reconnect, nil
1128 }
1129 ac.cc.blockingpicker.updateConnectionError(err)
1130 }
1131
1132 // Couldn't connect to any address.
1133 return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
1134}
1135
1136// createTransport creates a connection to addr. It returns the transport and a
1137// Event in the successful case. The Event fires when the returned transport
1138// disconnects.
1139func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1140 prefaceReceived := make(chan struct{})
1141 onCloseCalled := make(chan struct{})
1142 reconnect := grpcsync.NewEvent()
1143
1144 target := transport.TargetInfo{
1145 Addr: addr.Addr,
1146 Metadata: addr.Metadata,
1147 Authority: ac.cc.authority,
1148 }
1149
1150 onGoAway := func(r transport.GoAwayReason) {
1151 ac.mu.Lock()
1152 ac.adjustParams(r)
1153 ac.mu.Unlock()
1154 reconnect.Fire()
1155 }
1156
1157 onClose := func() {
1158 close(onCloseCalled)
1159 reconnect.Fire()
1160 }
1161
1162 onPrefaceReceipt := func() {
1163 close(prefaceReceived)
1164 }
1165
1166 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1167 defer cancel()
1168 if channelz.IsOn() {
1169 copts.ChannelzParentID = ac.channelzID
1170 }
1171
1172 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1173 if err != nil {
1174 // newTr is either nil, or closed.
1175 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1176 return nil, nil, err
1177 }
1178
1179 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1180 select {
1181 case <-time.After(connectDeadline.Sub(time.Now())):
1182 // We didn't get the preface in time.
1183 newTr.Close()
1184 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1185 return nil, nil, errors.New("timed out waiting for server handshake")
1186 case <-prefaceReceived:
1187 // We got the preface - huzzah! things are good.
1188 case <-onCloseCalled:
1189 // The transport has already closed - noop.
1190 return nil, nil, errors.New("connection closed")
1191 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1192 }
1193 }
1194 return newTr, reconnect, nil
1195}
1196
1197// startHealthCheck starts the health checking stream (RPC) to watch the health
1198// stats of this connection if health checking is requested and configured.
1199//
1200// LB channel health checking is enabled when all requirements below are met:
1201// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1202// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1203// 3. a service config with non-empty healthCheckConfig field is provided
1204// 4. the load balancer requests it
1205//
1206// It sets addrConn to READY if the health checking stream is not started.
1207//
1208// Caller must hold ac.mu.
1209func (ac *addrConn) startHealthCheck(ctx context.Context) {
1210 var healthcheckManagingState bool
1211 defer func() {
1212 if !healthcheckManagingState {
1213 ac.updateConnectivityState(connectivity.Ready)
1214 }
1215 }()
1216
1217 if ac.cc.dopts.disableHealthCheck {
1218 return
1219 }
1220 healthCheckConfig := ac.cc.healthCheckConfig()
1221 if healthCheckConfig == nil {
1222 return
1223 }
1224 if !ac.scopts.HealthCheckEnabled {
1225 return
1226 }
1227 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1228 if healthCheckFunc == nil {
1229 // The health package is not imported to set health check function.
1230 //
1231 // TODO: add a link to the health check doc in the error message.
1232 grpclog.Error("Health check is requested but health check function is not set.")
1233 return
1234 }
1235
1236 healthcheckManagingState = true
1237
1238 // Set up the health check helper functions.
1239 currentTr := ac.transport
1240 newStream := func(method string) (interface{}, error) {
1241 ac.mu.Lock()
1242 if ac.transport != currentTr {
1243 ac.mu.Unlock()
1244 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1245 }
1246 ac.mu.Unlock()
1247 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1248 }
1249 setConnectivityState := func(s connectivity.State) {
1250 ac.mu.Lock()
1251 defer ac.mu.Unlock()
1252 if ac.transport != currentTr {
1253 return
1254 }
1255 ac.updateConnectivityState(s)
1256 }
1257 // Start the health checking stream.
1258 go func() {
1259 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1260 if err != nil {
1261 if status.Code(err) == codes.Unimplemented {
1262 if channelz.IsOn() {
1263 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1264 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1265 Severity: channelz.CtError,
1266 })
1267 }
1268 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1269 } else {
1270 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1271 }
1272 }
1273 }()
1274}
1275
1276func (ac *addrConn) resetConnectBackoff() {
1277 ac.mu.Lock()
1278 close(ac.resetBackoff)
1279 ac.backoffIdx = 0
1280 ac.resetBackoff = make(chan struct{})
1281 ac.mu.Unlock()
1282}
1283
1284// getReadyTransport returns the transport if ac's state is READY.
1285// Otherwise it returns nil, false.
1286// If ac's state is IDLE, it will trigger ac to connect.
1287func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1288 ac.mu.Lock()
1289 if ac.state == connectivity.Ready && ac.transport != nil {
1290 t := ac.transport
1291 ac.mu.Unlock()
1292 return t, true
1293 }
1294 var idle bool
1295 if ac.state == connectivity.Idle {
1296 idle = true
1297 }
1298 ac.mu.Unlock()
1299 // Trigger idle ac to connect.
1300 if idle {
1301 ac.connect()
1302 }
1303 return nil, false
1304}
1305
1306// tearDown starts to tear down the addrConn.
1307// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1308// some edge cases (e.g., the caller opens and closes many addrConn's in a
1309// tight loop.
1310// tearDown doesn't remove ac from ac.cc.conns.
1311func (ac *addrConn) tearDown(err error) {
1312 ac.mu.Lock()
1313 if ac.state == connectivity.Shutdown {
1314 ac.mu.Unlock()
1315 return
1316 }
1317 curTr := ac.transport
1318 ac.transport = nil
1319 // We have to set the state to Shutdown before anything else to prevent races
1320 // between setting the state and logic that waits on context cancelation / etc.
1321 ac.updateConnectivityState(connectivity.Shutdown)
1322 ac.cancel()
1323 ac.curAddr = resolver.Address{}
1324 if err == errConnDrain && curTr != nil {
1325 // GracefulClose(...) may be executed multiple times when
1326 // i) receiving multiple GoAway frames from the server; or
1327 // ii) there are concurrent name resolver/Balancer triggered
1328 // address removal and GoAway.
1329 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1330 ac.mu.Unlock()
1331 curTr.GracefulClose()
1332 ac.mu.Lock()
1333 }
1334 if channelz.IsOn() {
1335 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1336 Desc: "Subchannel Deleted",
1337 Severity: channelz.CtINFO,
1338 Parent: &channelz.TraceEventDesc{
1339 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1340 Severity: channelz.CtINFO,
1341 },
1342 })
1343 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1344 // the entity beng deleted, and thus prevent it from being deleted right away.
1345 channelz.RemoveEntry(ac.channelzID)
1346 }
1347 ac.mu.Unlock()
1348}
1349
1350func (ac *addrConn) getState() connectivity.State {
1351 ac.mu.Lock()
1352 defer ac.mu.Unlock()
1353 return ac.state
1354}
1355
1356func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1357 ac.mu.Lock()
1358 addr := ac.curAddr.Addr
1359 ac.mu.Unlock()
1360 return &channelz.ChannelInternalMetric{
1361 State: ac.getState(),
1362 Target: addr,
1363 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1364 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1365 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1366 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1367 }
1368}
1369
1370func (ac *addrConn) incrCallsStarted() {
1371 atomic.AddInt64(&ac.czData.callsStarted, 1)
1372 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1373}
1374
1375func (ac *addrConn) incrCallsSucceeded() {
1376 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1377}
1378
1379func (ac *addrConn) incrCallsFailed() {
1380 atomic.AddInt64(&ac.czData.callsFailed, 1)
1381}
1382
1383type retryThrottler struct {
1384 max float64
1385 thresh float64
1386 ratio float64
1387
1388 mu sync.Mutex
1389 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1390}
1391
1392// throttle subtracts a retry token from the pool and returns whether a retry
1393// should be throttled (disallowed) based upon the retry throttling policy in
1394// the service config.
1395func (rt *retryThrottler) throttle() bool {
1396 if rt == nil {
1397 return false
1398 }
1399 rt.mu.Lock()
1400 defer rt.mu.Unlock()
1401 rt.tokens--
1402 if rt.tokens < 0 {
1403 rt.tokens = 0
1404 }
1405 return rt.tokens <= rt.thresh
1406}
1407
1408func (rt *retryThrottler) successfulRPC() {
1409 if rt == nil {
1410 return
1411 }
1412 rt.mu.Lock()
1413 defer rt.mu.Unlock()
1414 rt.tokens += rt.ratio
1415 if rt.tokens > rt.max {
1416 rt.tokens = rt.max
1417 }
1418}
1419
1420type channelzChannel struct {
1421 cc *ClientConn
1422}
1423
1424func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1425 return c.cc.channelzMetric()
1426}
1427
1428// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1429// underlying connections within the specified timeout.
1430//
1431// Deprecated: This error is never returned by grpc and should not be
1432// referenced by users.
1433var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")