blob: a7643df7d297adac82edabe806fa12fe6fbc53be [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/grpcsync"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
46 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
47 "google.golang.org/grpc/serviceconfig"
48 "google.golang.org/grpc/status"
49)
50
51const (
52 // minimum time to give a connection to complete
53 minConnectTimeout = 20 * time.Second
54 // must match grpclbName in grpclb/grpclb.go
55 grpclbName = "grpclb"
56)
57
58var (
59 // ErrClientConnClosing indicates that the operation is illegal because
60 // the ClientConn is closing.
61 //
62 // Deprecated: this error should not be relied upon by users; use the status
63 // code of Canceled instead.
64 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
65 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
66 errConnDrain = errors.New("grpc: the connection is drained")
67 // errConnClosing indicates that the connection is closing.
68 errConnClosing = errors.New("grpc: the connection is closing")
69 // errBalancerClosed indicates that the balancer is closed.
70 errBalancerClosed = errors.New("grpc: balancer is closed")
71 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
72 // service config.
73 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
74)
75
76// The following errors are returned from Dial and DialContext
77var (
78 // errNoTransportSecurity indicates that there is no transport security
79 // being set for ClientConn. Users should either set one or explicitly
80 // call WithInsecure DialOption to disable security.
81 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
82 // errTransportCredsAndBundle indicates that creds bundle is used together
83 // with other individual Transport Credentials.
84 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
85 // errTransportCredentialsMissing indicates that users want to transmit security
86 // information (e.g., OAuth2 token) which requires secure connection on an insecure
87 // connection.
88 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
89 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
90 // and grpc.WithInsecure() are both called for a connection.
91 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
92)
93
94const (
95 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
96 defaultClientMaxSendMessageSize = math.MaxInt32
97 // http2IOBufSize specifies the buffer size for sending frames.
98 defaultWriteBufSize = 32 * 1024
99 defaultReadBufSize = 32 * 1024
100)
101
102// Dial creates a client connection to the given target.
103func Dial(target string, opts ...DialOption) (*ClientConn, error) {
104 return DialContext(context.Background(), target, opts...)
105}
106
107// DialContext creates a client connection to the given target. By default, it's
108// a non-blocking dial (the function won't wait for connections to be
109// established, and connecting happens in the background). To make it a blocking
110// dial, use WithBlock() dial option.
111//
112// In the non-blocking case, the ctx does not act against the connection. It
113// only controls the setup steps.
114//
115// In the blocking case, ctx can be used to cancel or expire the pending
116// connection. Once this function returns, the cancellation and expiration of
117// ctx will be noop. Users should call ClientConn.Close to terminate all the
118// pending operations after this function returns.
119//
120// The target name syntax is defined in
121// https://github.com/grpc/grpc/blob/master/doc/naming.md.
122// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
123func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
124 cc := &ClientConn{
125 target: target,
126 csMgr: &connectivityStateManager{},
127 conns: make(map[*addrConn]struct{}),
128 dopts: defaultDialOptions(),
129 blockingpicker: newPickerWrapper(),
130 czData: new(channelzData),
131 firstResolveEvent: grpcsync.NewEvent(),
132 }
133 cc.retryThrottler.Store((*retryThrottler)(nil))
134 cc.ctx, cc.cancel = context.WithCancel(context.Background())
135
136 for _, opt := range opts {
137 opt.apply(&cc.dopts)
138 }
139
140 chainUnaryClientInterceptors(cc)
141 chainStreamClientInterceptors(cc)
142
143 defer func() {
144 if err != nil {
145 cc.Close()
146 }
147 }()
148
149 if channelz.IsOn() {
150 if cc.dopts.channelzParentID != 0 {
151 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
152 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
153 Desc: "Channel Created",
154 Severity: channelz.CtINFO,
155 Parent: &channelz.TraceEventDesc{
156 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
157 Severity: channelz.CtINFO,
158 },
159 })
160 } else {
161 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
162 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
163 Desc: "Channel Created",
164 Severity: channelz.CtINFO,
165 })
166 }
167 cc.csMgr.channelzID = cc.channelzID
168 }
169
170 if !cc.dopts.insecure {
171 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
172 return nil, errNoTransportSecurity
173 }
174 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
175 return nil, errTransportCredsAndBundle
176 }
177 } else {
178 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
179 return nil, errCredentialsConflict
180 }
181 for _, cd := range cc.dopts.copts.PerRPCCredentials {
182 if cd.RequireTransportSecurity() {
183 return nil, errTransportCredentialsMissing
184 }
185 }
186 }
187
188 if cc.dopts.defaultServiceConfigRawJSON != nil {
189 sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
190 if err != nil {
191 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
192 }
193 cc.dopts.defaultServiceConfig = sc
194 }
195 cc.mkp = cc.dopts.copts.KeepaliveParams
196
197 if cc.dopts.copts.Dialer == nil {
198 cc.dopts.copts.Dialer = newProxyDialer(
199 func(ctx context.Context, addr string) (net.Conn, error) {
200 network, addr := parseDialTarget(addr)
201 return (&net.Dialer{}).DialContext(ctx, network, addr)
202 },
203 )
204 }
205
206 if cc.dopts.copts.UserAgent != "" {
207 cc.dopts.copts.UserAgent += " " + grpcUA
208 } else {
209 cc.dopts.copts.UserAgent = grpcUA
210 }
211
212 if cc.dopts.timeout > 0 {
213 var cancel context.CancelFunc
214 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
215 defer cancel()
216 }
217 defer func() {
218 select {
219 case <-ctx.Done():
220 conn, err = nil, ctx.Err()
221 default:
222 }
223 }()
224
225 scSet := false
226 if cc.dopts.scChan != nil {
227 // Try to get an initial service config.
228 select {
229 case sc, ok := <-cc.dopts.scChan:
230 if ok {
231 cc.sc = &sc
232 scSet = true
233 }
234 default:
235 }
236 }
237 if cc.dopts.bs == nil {
238 cc.dopts.bs = backoff.Exponential{
239 MaxDelay: DefaultBackoffConfig.MaxDelay,
240 }
241 }
242 if cc.dopts.resolverBuilder == nil {
243 // Only try to parse target when resolver builder is not already set.
244 cc.parsedTarget = parseTarget(cc.target)
245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
247 if cc.dopts.resolverBuilder == nil {
248 // If resolver builder is still nil, the parsed target's scheme is
249 // not registered. Fallback to default resolver and set Endpoint to
250 // the original target.
251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252 cc.parsedTarget = resolver.Target{
253 Scheme: resolver.GetDefaultScheme(),
254 Endpoint: target,
255 }
256 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
257 }
258 } else {
259 cc.parsedTarget = resolver.Target{Endpoint: target}
260 }
261 creds := cc.dopts.copts.TransportCredentials
262 if creds != nil && creds.Info().ServerName != "" {
263 cc.authority = creds.Info().ServerName
264 } else if cc.dopts.insecure && cc.dopts.authority != "" {
265 cc.authority = cc.dopts.authority
266 } else {
267 // Use endpoint from "scheme://authority/endpoint" as the default
268 // authority for ClientConn.
269 cc.authority = cc.parsedTarget.Endpoint
270 }
271
272 if cc.dopts.scChan != nil && !scSet {
273 // Blocking wait for the initial service config.
274 select {
275 case sc, ok := <-cc.dopts.scChan:
276 if ok {
277 cc.sc = &sc
278 }
279 case <-ctx.Done():
280 return nil, ctx.Err()
281 }
282 }
283 if cc.dopts.scChan != nil {
284 go cc.scWatcher()
285 }
286
287 var credsClone credentials.TransportCredentials
288 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
289 credsClone = creds.Clone()
290 }
291 cc.balancerBuildOpts = balancer.BuildOptions{
292 DialCreds: credsClone,
293 CredsBundle: cc.dopts.copts.CredsBundle,
294 Dialer: cc.dopts.copts.Dialer,
295 ChannelzParentID: cc.channelzID,
296 Target: cc.parsedTarget,
297 }
298
299 // Build the resolver.
300 rWrapper, err := newCCResolverWrapper(cc)
301 if err != nil {
302 return nil, fmt.Errorf("failed to build resolver: %v", err)
303 }
304
305 cc.mu.Lock()
306 cc.resolverWrapper = rWrapper
307 cc.mu.Unlock()
308 // A blocking dial blocks until the clientConn is ready.
309 if cc.dopts.block {
310 for {
311 s := cc.GetState()
312 if s == connectivity.Ready {
313 break
314 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
315 if err = cc.blockingpicker.connectionError(); err != nil {
316 terr, ok := err.(interface {
317 Temporary() bool
318 })
319 if ok && !terr.Temporary() {
320 return nil, err
321 }
322 }
323 }
324 if !cc.WaitForStateChange(ctx, s) {
325 // ctx got timeout or canceled.
326 return nil, ctx.Err()
327 }
328 }
329 }
330
331 return cc, nil
332}
333
334// chainUnaryClientInterceptors chains all unary client interceptors into one.
335func chainUnaryClientInterceptors(cc *ClientConn) {
336 interceptors := cc.dopts.chainUnaryInts
337 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
338 // be executed before any other chained interceptors.
339 if cc.dopts.unaryInt != nil {
340 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
341 }
342 var chainedInt UnaryClientInterceptor
343 if len(interceptors) == 0 {
344 chainedInt = nil
345 } else if len(interceptors) == 1 {
346 chainedInt = interceptors[0]
347 } else {
348 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
349 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
350 }
351 }
352 cc.dopts.unaryInt = chainedInt
353}
354
355// getChainUnaryInvoker recursively generate the chained unary invoker.
356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
357 if curr == len(interceptors)-1 {
358 return finalInvoker
359 }
360 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
361 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
362 }
363}
364
365// chainStreamClientInterceptors chains all stream client interceptors into one.
366func chainStreamClientInterceptors(cc *ClientConn) {
367 interceptors := cc.dopts.chainStreamInts
368 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
369 // be executed before any other chained interceptors.
370 if cc.dopts.streamInt != nil {
371 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
372 }
373 var chainedInt StreamClientInterceptor
374 if len(interceptors) == 0 {
375 chainedInt = nil
376 } else if len(interceptors) == 1 {
377 chainedInt = interceptors[0]
378 } else {
379 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
380 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
381 }
382 }
383 cc.dopts.streamInt = chainedInt
384}
385
386// getChainStreamer recursively generate the chained client stream constructor.
387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
388 if curr == len(interceptors)-1 {
389 return finalStreamer
390 }
391 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
392 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
393 }
394}
395
396// connectivityStateManager keeps the connectivity.State of ClientConn.
397// This struct will eventually be exported so the balancers can access it.
398type connectivityStateManager struct {
399 mu sync.Mutex
400 state connectivity.State
401 notifyChan chan struct{}
402 channelzID int64
403}
404
405// updateState updates the connectivity.State of ClientConn.
406// If there's a change it notifies goroutines waiting on state change to
407// happen.
408func (csm *connectivityStateManager) updateState(state connectivity.State) {
409 csm.mu.Lock()
410 defer csm.mu.Unlock()
411 if csm.state == connectivity.Shutdown {
412 return
413 }
414 if csm.state == state {
415 return
416 }
417 csm.state = state
418 if channelz.IsOn() {
419 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
420 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
421 Severity: channelz.CtINFO,
422 })
423 }
424 if csm.notifyChan != nil {
425 // There are other goroutines waiting on this channel.
426 close(csm.notifyChan)
427 csm.notifyChan = nil
428 }
429}
430
431func (csm *connectivityStateManager) getState() connectivity.State {
432 csm.mu.Lock()
433 defer csm.mu.Unlock()
434 return csm.state
435}
436
437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
438 csm.mu.Lock()
439 defer csm.mu.Unlock()
440 if csm.notifyChan == nil {
441 csm.notifyChan = make(chan struct{})
442 }
443 return csm.notifyChan
444}
445
446// ClientConn represents a client connection to an RPC server.
447type ClientConn struct {
448 ctx context.Context
449 cancel context.CancelFunc
450
451 target string
452 parsedTarget resolver.Target
453 authority string
454 dopts dialOptions
455 csMgr *connectivityStateManager
456
457 balancerBuildOpts balancer.BuildOptions
458 blockingpicker *pickerWrapper
459
460 mu sync.RWMutex
461 resolverWrapper *ccResolverWrapper
462 sc *ServiceConfig
463 conns map[*addrConn]struct{}
464 // Keepalive parameter can be updated if a GoAway is received.
465 mkp keepalive.ClientParameters
466 curBalancerName string
467 balancerWrapper *ccBalancerWrapper
468 retryThrottler atomic.Value
469
470 firstResolveEvent *grpcsync.Event
471
472 channelzID int64 // channelz unique identification number
473 czData *channelzData
474}
475
476// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
477// ctx expires. A true value is returned in former case and false in latter.
478// This is an EXPERIMENTAL API.
479func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
480 ch := cc.csMgr.getNotifyChan()
481 if cc.csMgr.getState() != sourceState {
482 return true
483 }
484 select {
485 case <-ctx.Done():
486 return false
487 case <-ch:
488 return true
489 }
490}
491
492// GetState returns the connectivity.State of ClientConn.
493// This is an EXPERIMENTAL API.
494func (cc *ClientConn) GetState() connectivity.State {
495 return cc.csMgr.getState()
496}
497
498func (cc *ClientConn) scWatcher() {
499 for {
500 select {
501 case sc, ok := <-cc.dopts.scChan:
502 if !ok {
503 return
504 }
505 cc.mu.Lock()
506 // TODO: load balance policy runtime change is ignored.
507 // We may revisit this decision in the future.
508 cc.sc = &sc
509 cc.mu.Unlock()
510 case <-cc.ctx.Done():
511 return
512 }
513 }
514}
515
516// waitForResolvedAddrs blocks until the resolver has provided addresses or the
517// context expires. Returns nil unless the context expires first; otherwise
518// returns a status error based on the context.
519func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
520 // This is on the RPC path, so we use a fast path to avoid the
521 // more-expensive "select" below after the resolver has returned once.
522 if cc.firstResolveEvent.HasFired() {
523 return nil
524 }
525 select {
526 case <-cc.firstResolveEvent.Done():
527 return nil
528 case <-ctx.Done():
529 return status.FromContextError(ctx.Err()).Err()
530 case <-cc.ctx.Done():
531 return ErrClientConnClosing
532 }
533}
534
535func (cc *ClientConn) updateResolverState(s resolver.State) error {
536 cc.mu.Lock()
537 defer cc.mu.Unlock()
538 // Check if the ClientConn is already closed. Some fields (e.g.
539 // balancerWrapper) are set to nil when closing the ClientConn, and could
540 // cause nil pointer panic if we don't have this check.
541 if cc.conns == nil {
542 return nil
543 }
544
545 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
546 if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
547 cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
548 }
549 } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
550 cc.applyServiceConfig(sc)
551 }
552
553 var balCfg serviceconfig.LoadBalancingConfig
554 if cc.dopts.balancerBuilder == nil {
555 // Only look at balancer types and switch balancer if balancer dial
556 // option is not set.
557 var newBalancerName string
558 if cc.sc != nil && cc.sc.lbConfig != nil {
559 newBalancerName = cc.sc.lbConfig.name
560 balCfg = cc.sc.lbConfig.cfg
561 } else {
562 var isGRPCLB bool
563 for _, a := range s.Addresses {
564 if a.Type == resolver.GRPCLB {
565 isGRPCLB = true
566 break
567 }
568 }
569 if isGRPCLB {
570 newBalancerName = grpclbName
571 } else if cc.sc != nil && cc.sc.LB != nil {
572 newBalancerName = *cc.sc.LB
573 } else {
574 newBalancerName = PickFirstBalancerName
575 }
576 }
577 cc.switchBalancer(newBalancerName)
578 } else if cc.balancerWrapper == nil {
579 // Balancer dial option was set, and this is the first time handling
580 // resolved addresses. Build a balancer with dopts.balancerBuilder.
581 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
582 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
583 }
584
585 cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
586 return nil
587}
588
589// switchBalancer starts the switching from current balancer to the balancer
590// with the given name.
591//
592// It will NOT send the current address list to the new balancer. If needed,
593// caller of this function should send address list to the new balancer after
594// this function returns.
595//
596// Caller must hold cc.mu.
597func (cc *ClientConn) switchBalancer(name string) {
598 if strings.EqualFold(cc.curBalancerName, name) {
599 return
600 }
601
602 grpclog.Infof("ClientConn switching balancer to %q", name)
603 if cc.dopts.balancerBuilder != nil {
604 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
605 return
606 }
607 if cc.balancerWrapper != nil {
608 cc.balancerWrapper.close()
609 }
610
611 builder := balancer.Get(name)
612 if channelz.IsOn() {
613 if builder == nil {
614 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
615 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
616 Severity: channelz.CtWarning,
617 })
618 } else {
619 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
620 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
621 Severity: channelz.CtINFO,
622 })
623 }
624 }
625 if builder == nil {
626 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
627 builder = newPickfirstBuilder()
628 }
629
630 cc.curBalancerName = builder.Name()
631 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
632}
633
634func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
635 cc.mu.Lock()
636 if cc.conns == nil {
637 cc.mu.Unlock()
638 return
639 }
640 // TODO(bar switching) send updates to all balancer wrappers when balancer
641 // gracefully switching is supported.
642 cc.balancerWrapper.handleSubConnStateChange(sc, s)
643 cc.mu.Unlock()
644}
645
646// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
647//
648// Caller needs to make sure len(addrs) > 0.
649func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
650 ac := &addrConn{
651 cc: cc,
652 addrs: addrs,
653 scopts: opts,
654 dopts: cc.dopts,
655 czData: new(channelzData),
656 resetBackoff: make(chan struct{}),
657 }
658 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
659 // Track ac in cc. This needs to be done before any getTransport(...) is called.
660 cc.mu.Lock()
661 if cc.conns == nil {
662 cc.mu.Unlock()
663 return nil, ErrClientConnClosing
664 }
665 if channelz.IsOn() {
666 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
667 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
668 Desc: "Subchannel Created",
669 Severity: channelz.CtINFO,
670 Parent: &channelz.TraceEventDesc{
671 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
672 Severity: channelz.CtINFO,
673 },
674 })
675 }
676 cc.conns[ac] = struct{}{}
677 cc.mu.Unlock()
678 return ac, nil
679}
680
681// removeAddrConn removes the addrConn in the subConn from clientConn.
682// It also tears down the ac with the given error.
683func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
684 cc.mu.Lock()
685 if cc.conns == nil {
686 cc.mu.Unlock()
687 return
688 }
689 delete(cc.conns, ac)
690 cc.mu.Unlock()
691 ac.tearDown(err)
692}
693
694func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
695 return &channelz.ChannelInternalMetric{
696 State: cc.GetState(),
697 Target: cc.target,
698 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
699 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
700 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
701 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
702 }
703}
704
705// Target returns the target string of the ClientConn.
706// This is an EXPERIMENTAL API.
707func (cc *ClientConn) Target() string {
708 return cc.target
709}
710
711func (cc *ClientConn) incrCallsStarted() {
712 atomic.AddInt64(&cc.czData.callsStarted, 1)
713 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
714}
715
716func (cc *ClientConn) incrCallsSucceeded() {
717 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
718}
719
720func (cc *ClientConn) incrCallsFailed() {
721 atomic.AddInt64(&cc.czData.callsFailed, 1)
722}
723
724// connect starts creating a transport.
725// It does nothing if the ac is not IDLE.
726// TODO(bar) Move this to the addrConn section.
727func (ac *addrConn) connect() error {
728 ac.mu.Lock()
729 if ac.state == connectivity.Shutdown {
730 ac.mu.Unlock()
731 return errConnClosing
732 }
733 if ac.state != connectivity.Idle {
734 ac.mu.Unlock()
735 return nil
736 }
737 // Update connectivity state within the lock to prevent subsequent or
738 // concurrent calls from resetting the transport more than once.
739 ac.updateConnectivityState(connectivity.Connecting)
740 ac.mu.Unlock()
741
742 // Start a goroutine connecting to the server asynchronously.
743 go ac.resetTransport()
744 return nil
745}
746
747// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
748//
749// If ac is Connecting, it returns false. The caller should tear down the ac and
750// create a new one. Note that the backoff will be reset when this happens.
751//
752// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
753// addresses will be picked up by retry in the next iteration after backoff.
754//
755// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
756//
757// If ac is Ready, it checks whether current connected address of ac is in the
758// new addrs list.
759// - If true, it updates ac.addrs and returns true. The ac will keep using
760// the existing connection.
761// - If false, it does nothing and returns false.
762func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
763 ac.mu.Lock()
764 defer ac.mu.Unlock()
765 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
766 if ac.state == connectivity.Shutdown ||
767 ac.state == connectivity.TransientFailure ||
768 ac.state == connectivity.Idle {
769 ac.addrs = addrs
770 return true
771 }
772
773 if ac.state == connectivity.Connecting {
774 return false
775 }
776
777 // ac.state is Ready, try to find the connected address.
778 var curAddrFound bool
779 for _, a := range addrs {
780 if reflect.DeepEqual(ac.curAddr, a) {
781 curAddrFound = true
782 break
783 }
784 }
785 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
786 if curAddrFound {
787 ac.addrs = addrs
788 }
789
790 return curAddrFound
791}
792
793// GetMethodConfig gets the method config of the input method.
794// If there's an exact match for input method (i.e. /service/method), we return
795// the corresponding MethodConfig.
796// If there isn't an exact match for the input method, we look for the default config
797// under the service (i.e /service/). If there is a default MethodConfig for
798// the service, we return it.
799// Otherwise, we return an empty MethodConfig.
800func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
801 // TODO: Avoid the locking here.
802 cc.mu.RLock()
803 defer cc.mu.RUnlock()
804 if cc.sc == nil {
805 return MethodConfig{}
806 }
807 m, ok := cc.sc.Methods[method]
808 if !ok {
809 i := strings.LastIndex(method, "/")
810 m = cc.sc.Methods[method[:i+1]]
811 }
812 return m
813}
814
815func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
816 cc.mu.RLock()
817 defer cc.mu.RUnlock()
818 if cc.sc == nil {
819 return nil
820 }
821 return cc.sc.healthCheckConfig
822}
823
824func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
825 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
826 FullMethodName: method,
827 })
828 if err != nil {
829 return nil, nil, toRPCErr(err)
830 }
831 return t, done, nil
832}
833
834func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
835 if sc == nil {
836 // should never reach here.
837 return fmt.Errorf("got nil pointer for service config")
838 }
839 cc.sc = sc
840
841 if cc.sc.retryThrottling != nil {
842 newThrottler := &retryThrottler{
843 tokens: cc.sc.retryThrottling.MaxTokens,
844 max: cc.sc.retryThrottling.MaxTokens,
845 thresh: cc.sc.retryThrottling.MaxTokens / 2,
846 ratio: cc.sc.retryThrottling.TokenRatio,
847 }
848 cc.retryThrottler.Store(newThrottler)
849 } else {
850 cc.retryThrottler.Store((*retryThrottler)(nil))
851 }
852
853 return nil
854}
855
856func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
857 cc.mu.RLock()
858 r := cc.resolverWrapper
859 cc.mu.RUnlock()
860 if r == nil {
861 return
862 }
863 go r.resolveNow(o)
864}
865
866// ResetConnectBackoff wakes up all subchannels in transient failure and causes
867// them to attempt another connection immediately. It also resets the backoff
868// times used for subsequent attempts regardless of the current state.
869//
870// In general, this function should not be used. Typical service or network
871// outages result in a reasonable client reconnection strategy by default.
872// However, if a previously unavailable network becomes available, this may be
873// used to trigger an immediate reconnect.
874//
875// This API is EXPERIMENTAL.
876func (cc *ClientConn) ResetConnectBackoff() {
877 cc.mu.Lock()
878 defer cc.mu.Unlock()
879 for ac := range cc.conns {
880 ac.resetConnectBackoff()
881 }
882}
883
884// Close tears down the ClientConn and all underlying connections.
885func (cc *ClientConn) Close() error {
886 defer cc.cancel()
887
888 cc.mu.Lock()
889 if cc.conns == nil {
890 cc.mu.Unlock()
891 return ErrClientConnClosing
892 }
893 conns := cc.conns
894 cc.conns = nil
895 cc.csMgr.updateState(connectivity.Shutdown)
896
897 rWrapper := cc.resolverWrapper
898 cc.resolverWrapper = nil
899 bWrapper := cc.balancerWrapper
900 cc.balancerWrapper = nil
901 cc.mu.Unlock()
902
903 cc.blockingpicker.close()
904
905 if rWrapper != nil {
906 rWrapper.close()
907 }
908 if bWrapper != nil {
909 bWrapper.close()
910 }
911
912 for ac := range conns {
913 ac.tearDown(ErrClientConnClosing)
914 }
915 if channelz.IsOn() {
916 ted := &channelz.TraceEventDesc{
917 Desc: "Channel Deleted",
918 Severity: channelz.CtINFO,
919 }
920 if cc.dopts.channelzParentID != 0 {
921 ted.Parent = &channelz.TraceEventDesc{
922 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
923 Severity: channelz.CtINFO,
924 }
925 }
926 channelz.AddTraceEvent(cc.channelzID, ted)
927 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
928 // the entity being deleted, and thus prevent it from being deleted right away.
929 channelz.RemoveEntry(cc.channelzID)
930 }
931 return nil
932}
933
934// addrConn is a network connection to a given address.
935type addrConn struct {
936 ctx context.Context
937 cancel context.CancelFunc
938
939 cc *ClientConn
940 dopts dialOptions
941 acbw balancer.SubConn
942 scopts balancer.NewSubConnOptions
943
944 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
945 // health checking may require server to report healthy to set ac to READY), and is reset
946 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
947 // is received, transport is closed, ac has been torn down).
948 transport transport.ClientTransport // The current transport.
949
950 mu sync.Mutex
951 curAddr resolver.Address // The current address.
952 addrs []resolver.Address // All addresses that the resolver resolved to.
953
954 // Use updateConnectivityState for updating addrConn's connectivity state.
955 state connectivity.State
956
957 backoffIdx int // Needs to be stateful for resetConnectBackoff.
958 resetBackoff chan struct{}
959
960 channelzID int64 // channelz unique identification number.
961 czData *channelzData
962}
963
964// Note: this requires a lock on ac.mu.
965func (ac *addrConn) updateConnectivityState(s connectivity.State) {
966 if ac.state == s {
967 return
968 }
969
970 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
971 ac.state = s
972 if channelz.IsOn() {
973 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
974 Desc: updateMsg,
975 Severity: channelz.CtINFO,
976 })
977 }
978 ac.cc.handleSubConnStateChange(ac.acbw, s)
979}
980
981// adjustParams updates parameters used to create transports upon
982// receiving a GoAway.
983func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
984 switch r {
985 case transport.GoAwayTooManyPings:
986 v := 2 * ac.dopts.copts.KeepaliveParams.Time
987 ac.cc.mu.Lock()
988 if v > ac.cc.mkp.Time {
989 ac.cc.mkp.Time = v
990 }
991 ac.cc.mu.Unlock()
992 }
993}
994
995func (ac *addrConn) resetTransport() {
996 for i := 0; ; i++ {
997 if i > 0 {
998 ac.cc.resolveNow(resolver.ResolveNowOption{})
999 }
1000
1001 ac.mu.Lock()
1002 if ac.state == connectivity.Shutdown {
1003 ac.mu.Unlock()
1004 return
1005 }
1006
1007 addrs := ac.addrs
1008 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1009 // This will be the duration that dial gets to finish.
1010 dialDuration := minConnectTimeout
1011 if ac.dopts.minConnectTimeout != nil {
1012 dialDuration = ac.dopts.minConnectTimeout()
1013 }
1014
1015 if dialDuration < backoffFor {
1016 // Give dial more time as we keep failing to connect.
1017 dialDuration = backoffFor
1018 }
1019 // We can potentially spend all the time trying the first address, and
1020 // if the server accepts the connection and then hangs, the following
1021 // addresses will never be tried.
1022 //
1023 // The spec doesn't mention what should be done for multiple addresses.
1024 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1025 connectDeadline := time.Now().Add(dialDuration)
1026
1027 ac.updateConnectivityState(connectivity.Connecting)
1028 ac.transport = nil
1029 ac.mu.Unlock()
1030
1031 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1032 if err != nil {
1033 // After exhausting all addresses, the addrConn enters
1034 // TRANSIENT_FAILURE.
1035 ac.mu.Lock()
1036 if ac.state == connectivity.Shutdown {
1037 ac.mu.Unlock()
1038 return
1039 }
1040 ac.updateConnectivityState(connectivity.TransientFailure)
1041
1042 // Backoff.
1043 b := ac.resetBackoff
1044 ac.mu.Unlock()
1045
1046 timer := time.NewTimer(backoffFor)
1047 select {
1048 case <-timer.C:
1049 ac.mu.Lock()
1050 ac.backoffIdx++
1051 ac.mu.Unlock()
1052 case <-b:
1053 timer.Stop()
1054 case <-ac.ctx.Done():
1055 timer.Stop()
1056 return
1057 }
1058 continue
1059 }
1060
1061 ac.mu.Lock()
1062 if ac.state == connectivity.Shutdown {
1063 ac.mu.Unlock()
1064 newTr.Close()
1065 return
1066 }
1067 ac.curAddr = addr
1068 ac.transport = newTr
1069 ac.backoffIdx = 0
1070
1071 hctx, hcancel := context.WithCancel(ac.ctx)
1072 ac.startHealthCheck(hctx)
1073 ac.mu.Unlock()
1074
1075 // Block until the created transport is down. And when this happens,
1076 // we restart from the top of the addr list.
1077 <-reconnect.Done()
1078 hcancel()
1079 // restart connecting - the top of the loop will set state to
1080 // CONNECTING. This is against the current connectivity semantics doc,
1081 // however it allows for graceful behavior for RPCs not yet dispatched
1082 // - unfortunate timing would otherwise lead to the RPC failing even
1083 // though the TRANSIENT_FAILURE state (called for by the doc) would be
1084 // instantaneous.
1085 //
1086 // Ideally we should transition to Idle here and block until there is
1087 // RPC activity that leads to the balancer requesting a reconnect of
1088 // the associated SubConn.
1089 }
1090}
1091
1092// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1093// first successful one. It returns the transport, the address and a Event in
1094// the successful case. The Event fires when the returned transport disconnects.
1095func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1096 for _, addr := range addrs {
1097 ac.mu.Lock()
1098 if ac.state == connectivity.Shutdown {
1099 ac.mu.Unlock()
1100 return nil, resolver.Address{}, nil, errConnClosing
1101 }
1102
1103 ac.cc.mu.RLock()
1104 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1105 ac.cc.mu.RUnlock()
1106
1107 copts := ac.dopts.copts
1108 if ac.scopts.CredsBundle != nil {
1109 copts.CredsBundle = ac.scopts.CredsBundle
1110 }
1111 ac.mu.Unlock()
1112
1113 if channelz.IsOn() {
1114 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1115 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1116 Severity: channelz.CtINFO,
1117 })
1118 }
1119
1120 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1121 if err == nil {
1122 return newTr, addr, reconnect, nil
1123 }
1124 ac.cc.blockingpicker.updateConnectionError(err)
1125 }
1126
1127 // Couldn't connect to any address.
1128 return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
1129}
1130
1131// createTransport creates a connection to addr. It returns the transport and a
1132// Event in the successful case. The Event fires when the returned transport
1133// disconnects.
1134func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1135 prefaceReceived := make(chan struct{})
1136 onCloseCalled := make(chan struct{})
1137 reconnect := grpcsync.NewEvent()
1138
1139 target := transport.TargetInfo{
1140 Addr: addr.Addr,
1141 Metadata: addr.Metadata,
1142 Authority: ac.cc.authority,
1143 }
1144
1145 once := sync.Once{}
1146 onGoAway := func(r transport.GoAwayReason) {
1147 ac.mu.Lock()
1148 ac.adjustParams(r)
1149 once.Do(func() {
1150 if ac.state == connectivity.Ready {
1151 // Prevent this SubConn from being used for new RPCs by setting its
1152 // state to Connecting.
1153 //
1154 // TODO: this should be Idle when grpc-go properly supports it.
1155 ac.updateConnectivityState(connectivity.Connecting)
1156 }
1157 })
1158 ac.mu.Unlock()
1159 reconnect.Fire()
1160 }
1161
1162 onClose := func() {
1163 ac.mu.Lock()
1164 once.Do(func() {
1165 if ac.state == connectivity.Ready {
1166 // Prevent this SubConn from being used for new RPCs by setting its
1167 // state to Connecting.
1168 //
1169 // TODO: this should be Idle when grpc-go properly supports it.
1170 ac.updateConnectivityState(connectivity.Connecting)
1171 }
1172 })
1173 ac.mu.Unlock()
1174 close(onCloseCalled)
1175 reconnect.Fire()
1176 }
1177
1178 onPrefaceReceipt := func() {
1179 close(prefaceReceived)
1180 }
1181
1182 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1183 defer cancel()
1184 if channelz.IsOn() {
1185 copts.ChannelzParentID = ac.channelzID
1186 }
1187
1188 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1189 if err != nil {
1190 // newTr is either nil, or closed.
1191 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1192 return nil, nil, err
1193 }
1194
1195 select {
1196 case <-time.After(connectDeadline.Sub(time.Now())):
1197 // We didn't get the preface in time.
1198 newTr.Close()
1199 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1200 return nil, nil, errors.New("timed out waiting for server handshake")
1201 case <-prefaceReceived:
1202 // We got the preface - huzzah! things are good.
1203 case <-onCloseCalled:
1204 // The transport has already closed - noop.
1205 return nil, nil, errors.New("connection closed")
1206 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1207 }
1208 return newTr, reconnect, nil
1209}
1210
1211// startHealthCheck starts the health checking stream (RPC) to watch the health
1212// stats of this connection if health checking is requested and configured.
1213//
1214// LB channel health checking is enabled when all requirements below are met:
1215// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1216// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1217// 3. a service config with non-empty healthCheckConfig field is provided
1218// 4. the load balancer requests it
1219//
1220// It sets addrConn to READY if the health checking stream is not started.
1221//
1222// Caller must hold ac.mu.
1223func (ac *addrConn) startHealthCheck(ctx context.Context) {
1224 var healthcheckManagingState bool
1225 defer func() {
1226 if !healthcheckManagingState {
1227 ac.updateConnectivityState(connectivity.Ready)
1228 }
1229 }()
1230
1231 if ac.cc.dopts.disableHealthCheck {
1232 return
1233 }
1234 healthCheckConfig := ac.cc.healthCheckConfig()
1235 if healthCheckConfig == nil {
1236 return
1237 }
1238 if !ac.scopts.HealthCheckEnabled {
1239 return
1240 }
1241 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1242 if healthCheckFunc == nil {
1243 // The health package is not imported to set health check function.
1244 //
1245 // TODO: add a link to the health check doc in the error message.
1246 grpclog.Error("Health check is requested but health check function is not set.")
1247 return
1248 }
1249
1250 healthcheckManagingState = true
1251
1252 // Set up the health check helper functions.
1253 currentTr := ac.transport
1254 newStream := func(method string) (interface{}, error) {
1255 ac.mu.Lock()
1256 if ac.transport != currentTr {
1257 ac.mu.Unlock()
1258 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1259 }
1260 ac.mu.Unlock()
1261 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1262 }
1263 setConnectivityState := func(s connectivity.State) {
1264 ac.mu.Lock()
1265 defer ac.mu.Unlock()
1266 if ac.transport != currentTr {
1267 return
1268 }
1269 ac.updateConnectivityState(s)
1270 }
1271 // Start the health checking stream.
1272 go func() {
1273 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1274 if err != nil {
1275 if status.Code(err) == codes.Unimplemented {
1276 if channelz.IsOn() {
1277 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1278 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1279 Severity: channelz.CtError,
1280 })
1281 }
1282 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1283 } else {
1284 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1285 }
1286 }
1287 }()
1288}
1289
1290func (ac *addrConn) resetConnectBackoff() {
1291 ac.mu.Lock()
1292 close(ac.resetBackoff)
1293 ac.backoffIdx = 0
1294 ac.resetBackoff = make(chan struct{})
1295 ac.mu.Unlock()
1296}
1297
1298// getReadyTransport returns the transport if ac's state is READY.
1299// Otherwise it returns nil, false.
1300// If ac's state is IDLE, it will trigger ac to connect.
1301func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1302 ac.mu.Lock()
1303 if ac.state == connectivity.Ready && ac.transport != nil {
1304 t := ac.transport
1305 ac.mu.Unlock()
1306 return t, true
1307 }
1308 var idle bool
1309 if ac.state == connectivity.Idle {
1310 idle = true
1311 }
1312 ac.mu.Unlock()
1313 // Trigger idle ac to connect.
1314 if idle {
1315 ac.connect()
1316 }
1317 return nil, false
1318}
1319
1320// tearDown starts to tear down the addrConn.
1321// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1322// some edge cases (e.g., the caller opens and closes many addrConn's in a
1323// tight loop.
1324// tearDown doesn't remove ac from ac.cc.conns.
1325func (ac *addrConn) tearDown(err error) {
1326 ac.mu.Lock()
1327 if ac.state == connectivity.Shutdown {
1328 ac.mu.Unlock()
1329 return
1330 }
1331 curTr := ac.transport
1332 ac.transport = nil
1333 // We have to set the state to Shutdown before anything else to prevent races
1334 // between setting the state and logic that waits on context cancelation / etc.
1335 ac.updateConnectivityState(connectivity.Shutdown)
1336 ac.cancel()
1337 ac.curAddr = resolver.Address{}
1338 if err == errConnDrain && curTr != nil {
1339 // GracefulClose(...) may be executed multiple times when
1340 // i) receiving multiple GoAway frames from the server; or
1341 // ii) there are concurrent name resolver/Balancer triggered
1342 // address removal and GoAway.
1343 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1344 ac.mu.Unlock()
1345 curTr.GracefulClose()
1346 ac.mu.Lock()
1347 }
1348 if channelz.IsOn() {
1349 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1350 Desc: "Subchannel Deleted",
1351 Severity: channelz.CtINFO,
1352 Parent: &channelz.TraceEventDesc{
1353 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1354 Severity: channelz.CtINFO,
1355 },
1356 })
1357 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1358 // the entity beng deleted, and thus prevent it from being deleted right away.
1359 channelz.RemoveEntry(ac.channelzID)
1360 }
1361 ac.mu.Unlock()
1362}
1363
1364func (ac *addrConn) getState() connectivity.State {
1365 ac.mu.Lock()
1366 defer ac.mu.Unlock()
1367 return ac.state
1368}
1369
1370func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1371 ac.mu.Lock()
1372 addr := ac.curAddr.Addr
1373 ac.mu.Unlock()
1374 return &channelz.ChannelInternalMetric{
1375 State: ac.getState(),
1376 Target: addr,
1377 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1378 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1379 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1380 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1381 }
1382}
1383
1384func (ac *addrConn) incrCallsStarted() {
1385 atomic.AddInt64(&ac.czData.callsStarted, 1)
1386 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1387}
1388
1389func (ac *addrConn) incrCallsSucceeded() {
1390 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1391}
1392
1393func (ac *addrConn) incrCallsFailed() {
1394 atomic.AddInt64(&ac.czData.callsFailed, 1)
1395}
1396
1397type retryThrottler struct {
1398 max float64
1399 thresh float64
1400 ratio float64
1401
1402 mu sync.Mutex
1403 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1404}
1405
1406// throttle subtracts a retry token from the pool and returns whether a retry
1407// should be throttled (disallowed) based upon the retry throttling policy in
1408// the service config.
1409func (rt *retryThrottler) throttle() bool {
1410 if rt == nil {
1411 return false
1412 }
1413 rt.mu.Lock()
1414 defer rt.mu.Unlock()
1415 rt.tokens--
1416 if rt.tokens < 0 {
1417 rt.tokens = 0
1418 }
1419 return rt.tokens <= rt.thresh
1420}
1421
1422func (rt *retryThrottler) successfulRPC() {
1423 if rt == nil {
1424 return
1425 }
1426 rt.mu.Lock()
1427 defer rt.mu.Unlock()
1428 rt.tokens += rt.ratio
1429 if rt.tokens > rt.max {
1430 rt.tokens = rt.max
1431 }
1432}
1433
1434type channelzChannel struct {
1435 cc *ClientConn
1436}
1437
1438func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1439 return c.cc.channelzMetric()
1440}
1441
1442// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1443// underlying connections within the specified timeout.
1444//
1445// Deprecated: This error is never returned by grpc and should not be
1446// referenced by users.
1447var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")