blob: df1bb943add5003a1d7e779d8a14466975ff561f [file] [log] [blame]
Scott Baker2d897982019-09-24 11:50:08 -07001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/envconfig"
42 "google.golang.org/grpc/internal/grpcsync"
43 "google.golang.org/grpc/internal/transport"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/resolver"
47 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
48 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
49 "google.golang.org/grpc/status"
50)
51
52const (
53 // minimum time to give a connection to complete
54 minConnectTimeout = 20 * time.Second
55 // must match grpclbName in grpclb/grpclb.go
56 grpclbName = "grpclb"
57)
58
59var (
60 // ErrClientConnClosing indicates that the operation is illegal because
61 // the ClientConn is closing.
62 //
63 // Deprecated: this error should not be relied upon by users; use the status
64 // code of Canceled instead.
65 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
66 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
67 errConnDrain = errors.New("grpc: the connection is drained")
68 // errConnClosing indicates that the connection is closing.
69 errConnClosing = errors.New("grpc: the connection is closing")
70 // errBalancerClosed indicates that the balancer is closed.
71 errBalancerClosed = errors.New("grpc: balancer is closed")
72 // We use an accessor so that minConnectTimeout can be
73 // atomically read and updated while testing.
74 getMinConnectTimeout = func() time.Duration {
75 return minConnectTimeout
76 }
77)
78
79// The following errors are returned from Dial and DialContext
80var (
81 // errNoTransportSecurity indicates that there is no transport security
82 // being set for ClientConn. Users should either set one or explicitly
83 // call WithInsecure DialOption to disable security.
84 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
85 // errTransportCredsAndBundle indicates that creds bundle is used together
86 // with other individual Transport Credentials.
87 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
88 // errTransportCredentialsMissing indicates that users want to transmit security
89 // information (e.g., OAuth2 token) which requires secure connection on an insecure
90 // connection.
91 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
92 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
93 // and grpc.WithInsecure() are both called for a connection.
94 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
95)
96
97const (
98 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
99 defaultClientMaxSendMessageSize = math.MaxInt32
100 // http2IOBufSize specifies the buffer size for sending frames.
101 defaultWriteBufSize = 32 * 1024
102 defaultReadBufSize = 32 * 1024
103)
104
105// Dial creates a client connection to the given target.
106func Dial(target string, opts ...DialOption) (*ClientConn, error) {
107 return DialContext(context.Background(), target, opts...)
108}
109
110// DialContext creates a client connection to the given target. By default, it's
111// a non-blocking dial (the function won't wait for connections to be
112// established, and connecting happens in the background). To make it a blocking
113// dial, use WithBlock() dial option.
114//
115// In the non-blocking case, the ctx does not act against the connection. It
116// only controls the setup steps.
117//
118// In the blocking case, ctx can be used to cancel or expire the pending
119// connection. Once this function returns, the cancellation and expiration of
120// ctx will be noop. Users should call ClientConn.Close to terminate all the
121// pending operations after this function returns.
122//
123// The target name syntax is defined in
124// https://github.com/grpc/grpc/blob/master/doc/naming.md.
125// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
126func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
127 cc := &ClientConn{
128 target: target,
129 csMgr: &connectivityStateManager{},
130 conns: make(map[*addrConn]struct{}),
131 dopts: defaultDialOptions(),
132 blockingpicker: newPickerWrapper(),
133 czData: new(channelzData),
134 firstResolveEvent: grpcsync.NewEvent(),
135 }
136 cc.retryThrottler.Store((*retryThrottler)(nil))
137 cc.ctx, cc.cancel = context.WithCancel(context.Background())
138
139 for _, opt := range opts {
140 opt.apply(&cc.dopts)
141 }
142
143 if channelz.IsOn() {
144 if cc.dopts.channelzParentID != 0 {
145 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
146 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
147 Desc: "Channel Created",
148 Severity: channelz.CtINFO,
149 Parent: &channelz.TraceEventDesc{
150 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
151 Severity: channelz.CtINFO,
152 },
153 })
154 } else {
155 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
156 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
157 Desc: "Channel Created",
158 Severity: channelz.CtINFO,
159 })
160 }
161 cc.csMgr.channelzID = cc.channelzID
162 }
163
164 if !cc.dopts.insecure {
165 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
166 return nil, errNoTransportSecurity
167 }
168 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
169 return nil, errTransportCredsAndBundle
170 }
171 } else {
172 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
173 return nil, errCredentialsConflict
174 }
175 for _, cd := range cc.dopts.copts.PerRPCCredentials {
176 if cd.RequireTransportSecurity() {
177 return nil, errTransportCredentialsMissing
178 }
179 }
180 }
181
182 cc.mkp = cc.dopts.copts.KeepaliveParams
183
184 if cc.dopts.copts.Dialer == nil {
185 cc.dopts.copts.Dialer = newProxyDialer(
186 func(ctx context.Context, addr string) (net.Conn, error) {
187 network, addr := parseDialTarget(addr)
188 return (&net.Dialer{}).DialContext(ctx, network, addr)
189 },
190 )
191 }
192
193 if cc.dopts.copts.UserAgent != "" {
194 cc.dopts.copts.UserAgent += " " + grpcUA
195 } else {
196 cc.dopts.copts.UserAgent = grpcUA
197 }
198
199 if cc.dopts.timeout > 0 {
200 var cancel context.CancelFunc
201 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
202 defer cancel()
203 }
204
205 defer func() {
206 select {
207 case <-ctx.Done():
208 conn, err = nil, ctx.Err()
209 default:
210 }
211
212 if err != nil {
213 cc.Close()
214 }
215 }()
216
217 scSet := false
218 if cc.dopts.scChan != nil {
219 // Try to get an initial service config.
220 select {
221 case sc, ok := <-cc.dopts.scChan:
222 if ok {
223 cc.sc = sc
224 scSet = true
225 }
226 default:
227 }
228 }
229 if cc.dopts.bs == nil {
230 cc.dopts.bs = backoff.Exponential{
231 MaxDelay: DefaultBackoffConfig.MaxDelay,
232 }
233 }
234 if cc.dopts.resolverBuilder == nil {
235 // Only try to parse target when resolver builder is not already set.
236 cc.parsedTarget = parseTarget(cc.target)
237 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
238 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
239 if cc.dopts.resolverBuilder == nil {
240 // If resolver builder is still nil, the parsed target's scheme is
241 // not registered. Fallback to default resolver and set Endpoint to
242 // the original target.
243 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
244 cc.parsedTarget = resolver.Target{
245 Scheme: resolver.GetDefaultScheme(),
246 Endpoint: target,
247 }
248 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
249 }
250 } else {
251 cc.parsedTarget = resolver.Target{Endpoint: target}
252 }
253 creds := cc.dopts.copts.TransportCredentials
254 if creds != nil && creds.Info().ServerName != "" {
255 cc.authority = creds.Info().ServerName
256 } else if cc.dopts.insecure && cc.dopts.authority != "" {
257 cc.authority = cc.dopts.authority
258 } else {
259 // Use endpoint from "scheme://authority/endpoint" as the default
260 // authority for ClientConn.
261 cc.authority = cc.parsedTarget.Endpoint
262 }
263
264 if cc.dopts.scChan != nil && !scSet {
265 // Blocking wait for the initial service config.
266 select {
267 case sc, ok := <-cc.dopts.scChan:
268 if ok {
269 cc.sc = sc
270 }
271 case <-ctx.Done():
272 return nil, ctx.Err()
273 }
274 }
275 if cc.dopts.scChan != nil {
276 go cc.scWatcher()
277 }
278
279 var credsClone credentials.TransportCredentials
280 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
281 credsClone = creds.Clone()
282 }
283 cc.balancerBuildOpts = balancer.BuildOptions{
284 DialCreds: credsClone,
285 CredsBundle: cc.dopts.copts.CredsBundle,
286 Dialer: cc.dopts.copts.Dialer,
287 ChannelzParentID: cc.channelzID,
288 }
289
290 // Build the resolver.
291 rWrapper, err := newCCResolverWrapper(cc)
292 if err != nil {
293 return nil, fmt.Errorf("failed to build resolver: %v", err)
294 }
295
296 cc.mu.Lock()
297 cc.resolverWrapper = rWrapper
298 cc.mu.Unlock()
299 // A blocking dial blocks until the clientConn is ready.
300 if cc.dopts.block {
301 for {
302 s := cc.GetState()
303 if s == connectivity.Ready {
304 break
305 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
306 if err = cc.blockingpicker.connectionError(); err != nil {
307 terr, ok := err.(interface {
308 Temporary() bool
309 })
310 if ok && !terr.Temporary() {
311 return nil, err
312 }
313 }
314 }
315 if !cc.WaitForStateChange(ctx, s) {
316 // ctx got timeout or canceled.
317 return nil, ctx.Err()
318 }
319 }
320 }
321
322 return cc, nil
323}
324
325// connectivityStateManager keeps the connectivity.State of ClientConn.
326// This struct will eventually be exported so the balancers can access it.
327type connectivityStateManager struct {
328 mu sync.Mutex
329 state connectivity.State
330 notifyChan chan struct{}
331 channelzID int64
332}
333
334// updateState updates the connectivity.State of ClientConn.
335// If there's a change it notifies goroutines waiting on state change to
336// happen.
337func (csm *connectivityStateManager) updateState(state connectivity.State) {
338 csm.mu.Lock()
339 defer csm.mu.Unlock()
340 if csm.state == connectivity.Shutdown {
341 return
342 }
343 if csm.state == state {
344 return
345 }
346 csm.state = state
347 if channelz.IsOn() {
348 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
349 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
350 Severity: channelz.CtINFO,
351 })
352 }
353 if csm.notifyChan != nil {
354 // There are other goroutines waiting on this channel.
355 close(csm.notifyChan)
356 csm.notifyChan = nil
357 }
358}
359
360func (csm *connectivityStateManager) getState() connectivity.State {
361 csm.mu.Lock()
362 defer csm.mu.Unlock()
363 return csm.state
364}
365
366func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
367 csm.mu.Lock()
368 defer csm.mu.Unlock()
369 if csm.notifyChan == nil {
370 csm.notifyChan = make(chan struct{})
371 }
372 return csm.notifyChan
373}
374
375// ClientConn represents a client connection to an RPC server.
376type ClientConn struct {
377 ctx context.Context
378 cancel context.CancelFunc
379
380 target string
381 parsedTarget resolver.Target
382 authority string
383 dopts dialOptions
384 csMgr *connectivityStateManager
385
386 balancerBuildOpts balancer.BuildOptions
387 blockingpicker *pickerWrapper
388
389 mu sync.RWMutex
390 resolverWrapper *ccResolverWrapper
391 sc ServiceConfig
392 scRaw string
393 conns map[*addrConn]struct{}
394 // Keepalive parameter can be updated if a GoAway is received.
395 mkp keepalive.ClientParameters
396 curBalancerName string
397 preBalancerName string // previous balancer name.
398 curAddresses []resolver.Address
399 balancerWrapper *ccBalancerWrapper
400 retryThrottler atomic.Value
401
402 firstResolveEvent *grpcsync.Event
403
404 channelzID int64 // channelz unique identification number
405 czData *channelzData
406}
407
408// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
409// ctx expires. A true value is returned in former case and false in latter.
410// This is an EXPERIMENTAL API.
411func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
412 ch := cc.csMgr.getNotifyChan()
413 if cc.csMgr.getState() != sourceState {
414 return true
415 }
416 select {
417 case <-ctx.Done():
418 return false
419 case <-ch:
420 return true
421 }
422}
423
424// GetState returns the connectivity.State of ClientConn.
425// This is an EXPERIMENTAL API.
426func (cc *ClientConn) GetState() connectivity.State {
427 return cc.csMgr.getState()
428}
429
430func (cc *ClientConn) scWatcher() {
431 for {
432 select {
433 case sc, ok := <-cc.dopts.scChan:
434 if !ok {
435 return
436 }
437 cc.mu.Lock()
438 // TODO: load balance policy runtime change is ignored.
439 // We may revisit this decision in the future.
440 cc.sc = sc
441 cc.scRaw = ""
442 cc.mu.Unlock()
443 case <-cc.ctx.Done():
444 return
445 }
446 }
447}
448
449// waitForResolvedAddrs blocks until the resolver has provided addresses or the
450// context expires. Returns nil unless the context expires first; otherwise
451// returns a status error based on the context.
452func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
453 // This is on the RPC path, so we use a fast path to avoid the
454 // more-expensive "select" below after the resolver has returned once.
455 if cc.firstResolveEvent.HasFired() {
456 return nil
457 }
458 select {
459 case <-cc.firstResolveEvent.Done():
460 return nil
461 case <-ctx.Done():
462 return status.FromContextError(ctx.Err()).Err()
463 case <-cc.ctx.Done():
464 return ErrClientConnClosing
465 }
466}
467
468func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
469 cc.mu.Lock()
470 defer cc.mu.Unlock()
471 if cc.conns == nil {
472 // cc was closed.
473 return
474 }
475
476 if reflect.DeepEqual(cc.curAddresses, addrs) {
477 return
478 }
479
480 cc.curAddresses = addrs
481 cc.firstResolveEvent.Fire()
482
483 if cc.dopts.balancerBuilder == nil {
484 // Only look at balancer types and switch balancer if balancer dial
485 // option is not set.
486 var isGRPCLB bool
487 for _, a := range addrs {
488 if a.Type == resolver.GRPCLB {
489 isGRPCLB = true
490 break
491 }
492 }
493 var newBalancerName string
494 if isGRPCLB {
495 newBalancerName = grpclbName
496 } else {
497 // Address list doesn't contain grpclb address. Try to pick a
498 // non-grpclb balancer.
499 newBalancerName = cc.curBalancerName
500 // If current balancer is grpclb, switch to the previous one.
501 if newBalancerName == grpclbName {
502 newBalancerName = cc.preBalancerName
503 }
504 // The following could be true in two cases:
505 // - the first time handling resolved addresses
506 // (curBalancerName="")
507 // - the first time handling non-grpclb addresses
508 // (curBalancerName="grpclb", preBalancerName="")
509 if newBalancerName == "" {
510 newBalancerName = PickFirstBalancerName
511 }
512 }
513 cc.switchBalancer(newBalancerName)
514 } else if cc.balancerWrapper == nil {
515 // Balancer dial option was set, and this is the first time handling
516 // resolved addresses. Build a balancer with dopts.balancerBuilder.
517 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
518 }
519
520 cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
521}
522
523// switchBalancer starts the switching from current balancer to the balancer
524// with the given name.
525//
526// It will NOT send the current address list to the new balancer. If needed,
527// caller of this function should send address list to the new balancer after
528// this function returns.
529//
530// Caller must hold cc.mu.
531func (cc *ClientConn) switchBalancer(name string) {
532 if cc.conns == nil {
533 return
534 }
535
536 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
537 return
538 }
539
540 grpclog.Infof("ClientConn switching balancer to %q", name)
541 if cc.dopts.balancerBuilder != nil {
542 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
543 return
544 }
545 // TODO(bar switching) change this to two steps: drain and close.
546 // Keep track of sc in wrapper.
547 if cc.balancerWrapper != nil {
548 cc.balancerWrapper.close()
549 }
550
551 builder := balancer.Get(name)
552 // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
553 // we reuse previous one?
554 if channelz.IsOn() {
555 if builder == nil {
556 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
557 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
558 Severity: channelz.CtWarning,
559 })
560 } else {
561 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
562 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
563 Severity: channelz.CtINFO,
564 })
565 }
566 }
567 if builder == nil {
568 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
569 builder = newPickfirstBuilder()
570 }
571
572 cc.preBalancerName = cc.curBalancerName
573 cc.curBalancerName = builder.Name()
574 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
575}
576
577func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
578 cc.mu.Lock()
579 if cc.conns == nil {
580 cc.mu.Unlock()
581 return
582 }
583 // TODO(bar switching) send updates to all balancer wrappers when balancer
584 // gracefully switching is supported.
585 cc.balancerWrapper.handleSubConnStateChange(sc, s)
586 cc.mu.Unlock()
587}
588
589// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
590//
591// Caller needs to make sure len(addrs) > 0.
592func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
593 ac := &addrConn{
594 cc: cc,
595 addrs: addrs,
596 scopts: opts,
597 dopts: cc.dopts,
598 czData: new(channelzData),
599 resetBackoff: make(chan struct{}),
600 }
601 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
602 // Track ac in cc. This needs to be done before any getTransport(...) is called.
603 cc.mu.Lock()
604 if cc.conns == nil {
605 cc.mu.Unlock()
606 return nil, ErrClientConnClosing
607 }
608 if channelz.IsOn() {
609 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
610 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
611 Desc: "Subchannel Created",
612 Severity: channelz.CtINFO,
613 Parent: &channelz.TraceEventDesc{
614 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
615 Severity: channelz.CtINFO,
616 },
617 })
618 }
619 cc.conns[ac] = struct{}{}
620 cc.mu.Unlock()
621 return ac, nil
622}
623
624// removeAddrConn removes the addrConn in the subConn from clientConn.
625// It also tears down the ac with the given error.
626func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
627 cc.mu.Lock()
628 if cc.conns == nil {
629 cc.mu.Unlock()
630 return
631 }
632 delete(cc.conns, ac)
633 cc.mu.Unlock()
634 ac.tearDown(err)
635}
636
637func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
638 return &channelz.ChannelInternalMetric{
639 State: cc.GetState(),
640 Target: cc.target,
641 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
642 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
643 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
644 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
645 }
646}
647
648// Target returns the target string of the ClientConn.
649// This is an EXPERIMENTAL API.
650func (cc *ClientConn) Target() string {
651 return cc.target
652}
653
654func (cc *ClientConn) incrCallsStarted() {
655 atomic.AddInt64(&cc.czData.callsStarted, 1)
656 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
657}
658
659func (cc *ClientConn) incrCallsSucceeded() {
660 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
661}
662
663func (cc *ClientConn) incrCallsFailed() {
664 atomic.AddInt64(&cc.czData.callsFailed, 1)
665}
666
667// connect starts creating a transport.
668// It does nothing if the ac is not IDLE.
669// TODO(bar) Move this to the addrConn section.
670func (ac *addrConn) connect() error {
671 ac.mu.Lock()
672 if ac.state == connectivity.Shutdown {
673 ac.mu.Unlock()
674 return errConnClosing
675 }
676 if ac.state != connectivity.Idle {
677 ac.mu.Unlock()
678 return nil
679 }
680 ac.updateConnectivityState(connectivity.Connecting)
681 ac.mu.Unlock()
682
683 // Start a goroutine connecting to the server asynchronously.
684 go ac.resetTransport()
685 return nil
686}
687
688// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
689//
690// It checks whether current connected address of ac is in the new addrs list.
691// - If true, it updates ac.addrs and returns true. The ac will keep using
692// the existing connection.
693// - If false, it does nothing and returns false.
694func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
695 ac.mu.Lock()
696 defer ac.mu.Unlock()
697 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
698 if ac.state == connectivity.Shutdown {
699 ac.addrs = addrs
700 return true
701 }
702
703 // Unless we're busy reconnecting already, let's reconnect from the top of
704 // the list.
705 if ac.state != connectivity.Ready {
706 return false
707 }
708
709 var curAddrFound bool
710 for _, a := range addrs {
711 if reflect.DeepEqual(ac.curAddr, a) {
712 curAddrFound = true
713 break
714 }
715 }
716 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
717 if curAddrFound {
718 ac.addrs = addrs
719 }
720
721 return curAddrFound
722}
723
724// GetMethodConfig gets the method config of the input method.
725// If there's an exact match for input method (i.e. /service/method), we return
726// the corresponding MethodConfig.
727// If there isn't an exact match for the input method, we look for the default config
728// under the service (i.e /service/). If there is a default MethodConfig for
729// the service, we return it.
730// Otherwise, we return an empty MethodConfig.
731func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
732 // TODO: Avoid the locking here.
733 cc.mu.RLock()
734 defer cc.mu.RUnlock()
735 m, ok := cc.sc.Methods[method]
736 if !ok {
737 i := strings.LastIndex(method, "/")
738 m = cc.sc.Methods[method[:i+1]]
739 }
740 return m
741}
742
743func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
744 cc.mu.RLock()
745 defer cc.mu.RUnlock()
746 return cc.sc.healthCheckConfig
747}
748
749func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
750 hdr, _ := metadata.FromOutgoingContext(ctx)
751 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
752 FullMethodName: method,
753 Header: hdr,
754 })
755 if err != nil {
756 return nil, nil, toRPCErr(err)
757 }
758 return t, done, nil
759}
760
761// handleServiceConfig parses the service config string in JSON format to Go native
762// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
763func (cc *ClientConn) handleServiceConfig(js string) error {
764 if cc.dopts.disableServiceConfig {
765 return nil
766 }
767 if cc.scRaw == js {
768 return nil
769 }
770 if channelz.IsOn() {
771 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
772 // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
773 // for human consumption.
774 Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
775 Severity: channelz.CtINFO,
776 })
777 }
778 sc, err := parseServiceConfig(js)
779 if err != nil {
780 return err
781 }
782 cc.mu.Lock()
783 // Check if the ClientConn is already closed. Some fields (e.g.
784 // balancerWrapper) are set to nil when closing the ClientConn, and could
785 // cause nil pointer panic if we don't have this check.
786 if cc.conns == nil {
787 cc.mu.Unlock()
788 return nil
789 }
790 cc.scRaw = js
791 cc.sc = sc
792
793 if sc.retryThrottling != nil {
794 newThrottler := &retryThrottler{
795 tokens: sc.retryThrottling.MaxTokens,
796 max: sc.retryThrottling.MaxTokens,
797 thresh: sc.retryThrottling.MaxTokens / 2,
798 ratio: sc.retryThrottling.TokenRatio,
799 }
800 cc.retryThrottler.Store(newThrottler)
801 } else {
802 cc.retryThrottler.Store((*retryThrottler)(nil))
803 }
804
805 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
806 if cc.curBalancerName == grpclbName {
807 // If current balancer is grpclb, there's at least one grpclb
808 // balancer address in the resolved list. Don't switch the balancer,
809 // but change the previous balancer name, so if a new resolved
810 // address list doesn't contain grpclb address, balancer will be
811 // switched to *sc.LB.
812 cc.preBalancerName = *sc.LB
813 } else {
814 cc.switchBalancer(*sc.LB)
815 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
816 }
817 }
818
819 cc.mu.Unlock()
820 return nil
821}
822
823func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
824 cc.mu.RLock()
825 r := cc.resolverWrapper
826 cc.mu.RUnlock()
827 if r == nil {
828 return
829 }
830 go r.resolveNow(o)
831}
832
833// ResetConnectBackoff wakes up all subchannels in transient failure and causes
834// them to attempt another connection immediately. It also resets the backoff
835// times used for subsequent attempts regardless of the current state.
836//
837// In general, this function should not be used. Typical service or network
838// outages result in a reasonable client reconnection strategy by default.
839// However, if a previously unavailable network becomes available, this may be
840// used to trigger an immediate reconnect.
841//
842// This API is EXPERIMENTAL.
843func (cc *ClientConn) ResetConnectBackoff() {
844 cc.mu.Lock()
845 defer cc.mu.Unlock()
846 for ac := range cc.conns {
847 ac.resetConnectBackoff()
848 }
849}
850
851// Close tears down the ClientConn and all underlying connections.
852func (cc *ClientConn) Close() error {
853 defer cc.cancel()
854
855 cc.mu.Lock()
856 if cc.conns == nil {
857 cc.mu.Unlock()
858 return ErrClientConnClosing
859 }
860 conns := cc.conns
861 cc.conns = nil
862 cc.csMgr.updateState(connectivity.Shutdown)
863
864 rWrapper := cc.resolverWrapper
865 cc.resolverWrapper = nil
866 bWrapper := cc.balancerWrapper
867 cc.balancerWrapper = nil
868 cc.mu.Unlock()
869
870 cc.blockingpicker.close()
871
872 if rWrapper != nil {
873 rWrapper.close()
874 }
875 if bWrapper != nil {
876 bWrapper.close()
877 }
878
879 for ac := range conns {
880 ac.tearDown(ErrClientConnClosing)
881 }
882 if channelz.IsOn() {
883 ted := &channelz.TraceEventDesc{
884 Desc: "Channel Deleted",
885 Severity: channelz.CtINFO,
886 }
887 if cc.dopts.channelzParentID != 0 {
888 ted.Parent = &channelz.TraceEventDesc{
889 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
890 Severity: channelz.CtINFO,
891 }
892 }
893 channelz.AddTraceEvent(cc.channelzID, ted)
894 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
895 // the entity beng deleted, and thus prevent it from being deleted right away.
896 channelz.RemoveEntry(cc.channelzID)
897 }
898 return nil
899}
900
901// addrConn is a network connection to a given address.
902type addrConn struct {
903 ctx context.Context
904 cancel context.CancelFunc
905
906 cc *ClientConn
907 dopts dialOptions
908 acbw balancer.SubConn
909 scopts balancer.NewSubConnOptions
910
911 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
912 // health checking may require server to report healthy to set ac to READY), and is reset
913 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
914 // is received, transport is closed, ac has been torn down).
915 transport transport.ClientTransport // The current transport.
916
917 mu sync.Mutex
918 curAddr resolver.Address // The current address.
919 addrs []resolver.Address // All addresses that the resolver resolved to.
920
921 // Use updateConnectivityState for updating addrConn's connectivity state.
922 state connectivity.State
923
924 tearDownErr error // The reason this addrConn is torn down.
925
926 backoffIdx int // Needs to be stateful for resetConnectBackoff.
927 resetBackoff chan struct{}
928
929 channelzID int64 // channelz unique identification number.
930 czData *channelzData
931}
932
933// Note: this requires a lock on ac.mu.
934func (ac *addrConn) updateConnectivityState(s connectivity.State) {
935 if ac.state == s {
936 return
937 }
938
939 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
940 ac.state = s
941 if channelz.IsOn() {
942 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
943 Desc: updateMsg,
944 Severity: channelz.CtINFO,
945 })
946 }
947 ac.cc.handleSubConnStateChange(ac.acbw, s)
948}
949
950// adjustParams updates parameters used to create transports upon
951// receiving a GoAway.
952func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
953 switch r {
954 case transport.GoAwayTooManyPings:
955 v := 2 * ac.dopts.copts.KeepaliveParams.Time
956 ac.cc.mu.Lock()
957 if v > ac.cc.mkp.Time {
958 ac.cc.mkp.Time = v
959 }
960 ac.cc.mu.Unlock()
961 }
962}
963
964func (ac *addrConn) resetTransport() {
965 for i := 0; ; i++ {
966 tryNextAddrFromStart := grpcsync.NewEvent()
967
968 ac.mu.Lock()
969 if i > 0 {
970 ac.cc.resolveNow(resolver.ResolveNowOption{})
971 }
972 addrs := ac.addrs
973 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
974
975 // This will be the duration that dial gets to finish.
976 dialDuration := getMinConnectTimeout()
977 if dialDuration < backoffFor {
978 // Give dial more time as we keep failing to connect.
979 dialDuration = backoffFor
980 }
981 connectDeadline := time.Now().Add(dialDuration)
982 ac.mu.Unlock()
983
984 addrLoop:
985 for _, addr := range addrs {
986 ac.mu.Lock()
987
988 if ac.state == connectivity.Shutdown {
989 ac.mu.Unlock()
990 return
991 }
992 ac.updateConnectivityState(connectivity.Connecting)
993 ac.transport = nil
994
995 ac.cc.mu.RLock()
996 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
997 ac.cc.mu.RUnlock()
998
999 if ac.state == connectivity.Shutdown {
1000 ac.mu.Unlock()
1001 return
1002 }
1003
1004 copts := ac.dopts.copts
1005 if ac.scopts.CredsBundle != nil {
1006 copts.CredsBundle = ac.scopts.CredsBundle
1007 }
1008 hctx, hcancel := context.WithCancel(ac.ctx)
1009 defer hcancel()
1010 ac.mu.Unlock()
1011
1012 if channelz.IsOn() {
1013 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1014 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1015 Severity: channelz.CtINFO,
1016 })
1017 }
1018
1019 reconnect := grpcsync.NewEvent()
1020 prefaceReceived := make(chan struct{})
1021 newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
1022 if err == nil {
1023 ac.mu.Lock()
1024 ac.curAddr = addr
1025 ac.transport = newTr
1026 ac.mu.Unlock()
1027
1028 healthCheckConfig := ac.cc.healthCheckConfig()
1029 // LB channel health checking is only enabled when all the four requirements below are met:
1030 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1031 // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1032 // 3. a service config with non-empty healthCheckConfig field is provided,
1033 // 4. the current load balancer allows it.
1034 healthcheckManagingState := false
1035 if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1036 if ac.cc.dopts.healthCheckFunc == nil {
1037 // TODO: add a link to the health check doc in the error message.
1038 grpclog.Error("the client side LB channel health check function has not been set.")
1039 } else {
1040 // TODO(deklerk) refactor to just return transport
1041 go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1042 healthcheckManagingState = true
1043 }
1044 }
1045 if !healthcheckManagingState {
1046 ac.mu.Lock()
1047 ac.updateConnectivityState(connectivity.Ready)
1048 ac.mu.Unlock()
1049 }
1050 } else {
1051 hcancel()
1052 if err == errConnClosing {
1053 return
1054 }
1055
1056 if tryNextAddrFromStart.HasFired() {
1057 break addrLoop
1058 }
1059 continue
1060 }
1061
1062 backoffFor = 0
1063 ac.mu.Lock()
1064 reqHandshake := ac.dopts.reqHandshake
1065 ac.mu.Unlock()
1066
1067 <-reconnect.Done()
1068 hcancel()
1069
1070 if reqHandshake == envconfig.RequireHandshakeHybrid {
1071 // In RequireHandshakeHybrid mode, we must check to see whether
1072 // server preface has arrived yet to decide whether to start
1073 // reconnecting at the top of the list (server preface received)
1074 // or continue with the next addr in the list as if the
1075 // connection were not successful (server preface not received).
1076 select {
1077 case <-prefaceReceived:
1078 // We received a server preface - huzzah! We consider this
1079 // a success and restart from the top of the addr list.
1080 ac.mu.Lock()
1081 ac.backoffIdx = 0
1082 ac.mu.Unlock()
1083 break addrLoop
1084 default:
1085 // Despite having set state to READY, in hybrid mode we
1086 // consider this a failure and continue connecting at the
1087 // next addr in the list.
1088 ac.mu.Lock()
1089 if ac.state == connectivity.Shutdown {
1090 ac.mu.Unlock()
1091 return
1092 }
1093
1094 ac.updateConnectivityState(connectivity.TransientFailure)
1095 ac.mu.Unlock()
1096
1097 if tryNextAddrFromStart.HasFired() {
1098 break addrLoop
1099 }
1100 }
1101 } else {
1102 // In RequireHandshakeOn mode, we would have already waited for
1103 // the server preface, so we consider this a success and restart
1104 // from the top of the addr list. In RequireHandshakeOff mode,
1105 // we don't care to wait for the server preface before
1106 // considering this a success, so we also restart from the top
1107 // of the addr list.
1108 ac.mu.Lock()
1109 ac.backoffIdx = 0
1110 ac.mu.Unlock()
1111 break addrLoop
1112 }
1113 }
1114
1115 // After exhausting all addresses, or after need to reconnect after a
1116 // READY, the addrConn enters TRANSIENT_FAILURE.
1117 ac.mu.Lock()
1118 if ac.state == connectivity.Shutdown {
1119 ac.mu.Unlock()
1120 return
1121 }
1122 ac.updateConnectivityState(connectivity.TransientFailure)
1123
1124 // Backoff.
1125 b := ac.resetBackoff
1126 timer := time.NewTimer(backoffFor)
1127 acctx := ac.ctx
1128 ac.mu.Unlock()
1129
1130 select {
1131 case <-timer.C:
1132 ac.mu.Lock()
1133 ac.backoffIdx++
1134 ac.mu.Unlock()
1135 case <-b:
1136 timer.Stop()
1137 case <-acctx.Done():
1138 timer.Stop()
1139 return
1140 }
1141 }
1142}
1143
1144// createTransport creates a connection to one of the backends in addrs. It
1145// sets ac.transport in the success case, or it returns an error if it was
1146// unable to successfully create a transport.
1147//
1148// If waitForHandshake is enabled, it blocks until server preface arrives.
1149func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
1150 onCloseCalled := make(chan struct{})
1151
1152 target := transport.TargetInfo{
1153 Addr: addr.Addr,
1154 Metadata: addr.Metadata,
1155 Authority: ac.cc.authority,
1156 }
1157
1158 prefaceTimer := time.NewTimer(time.Until(connectDeadline))
1159
1160 onGoAway := func(r transport.GoAwayReason) {
1161 ac.mu.Lock()
1162 ac.adjustParams(r)
1163 ac.mu.Unlock()
1164 reconnect.Fire()
1165 }
1166
1167 onClose := func() {
1168 close(onCloseCalled)
1169 prefaceTimer.Stop()
1170 reconnect.Fire()
1171 }
1172
1173 onPrefaceReceipt := func() {
1174 close(prefaceReceived)
1175 prefaceTimer.Stop()
1176 }
1177
1178 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1179 defer cancel()
1180 if channelz.IsOn() {
1181 copts.ChannelzParentID = ac.channelzID
1182 }
1183
1184 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1185
1186 if err == nil {
1187 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1188 select {
1189 case <-prefaceTimer.C:
1190 // We didn't get the preface in time.
1191 newTr.Close()
1192 err = errors.New("timed out waiting for server handshake")
1193 case <-prefaceReceived:
1194 // We got the preface - huzzah! things are good.
1195 case <-onCloseCalled:
1196 // The transport has already closed - noop.
1197 return nil, errors.New("connection closed")
1198 }
1199 } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1200 go func() {
1201 select {
1202 case <-prefaceTimer.C:
1203 // We didn't get the preface in time.
1204 newTr.Close()
1205 case <-prefaceReceived:
1206 // We got the preface just in the nick of time - huzzah!
1207 case <-onCloseCalled:
1208 // The transport has already closed - noop.
1209 }
1210 }()
1211 }
1212 }
1213
1214 if err != nil {
1215 // newTr is either nil, or closed.
1216 ac.cc.blockingpicker.updateConnectionError(err)
1217 ac.mu.Lock()
1218 if ac.state == connectivity.Shutdown {
1219 // ac.tearDown(...) has been invoked.
1220 ac.mu.Unlock()
1221
1222 return nil, errConnClosing
1223 }
1224 ac.mu.Unlock()
1225 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1226 return nil, err
1227 }
1228
1229 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1230 ac.mu.Lock()
1231 if ac.state == connectivity.Shutdown {
1232 ac.mu.Unlock()
1233 newTr.Close()
1234 return nil, errConnClosing
1235 }
1236 ac.mu.Unlock()
1237
1238 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1239 ac.mu.Lock()
1240 if ac.state == connectivity.Shutdown {
1241 ac.mu.Unlock()
1242 newTr.Close()
1243 return nil, errConnClosing
1244 }
1245 ac.mu.Unlock()
1246
1247 return newTr, nil
1248}
1249
1250func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
1251 // Set up the health check helper functions
1252 newStream := func() (interface{}, error) {
1253 return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
1254 }
1255 firstReady := true
1256 reportHealth := func(ok bool) {
1257 ac.mu.Lock()
1258 defer ac.mu.Unlock()
1259 if ac.transport != newTr {
1260 return
1261 }
1262 if ok {
1263 if firstReady {
1264 firstReady = false
1265 ac.curAddr = addr
1266 }
1267 ac.updateConnectivityState(connectivity.Ready)
1268 } else {
1269 ac.updateConnectivityState(connectivity.TransientFailure)
1270 }
1271 }
1272 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
1273 if err != nil {
1274 if status.Code(err) == codes.Unimplemented {
1275 if channelz.IsOn() {
1276 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1277 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1278 Severity: channelz.CtError,
1279 })
1280 }
1281 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1282 } else {
1283 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1284 }
1285 }
1286}
1287
1288func (ac *addrConn) resetConnectBackoff() {
1289 ac.mu.Lock()
1290 close(ac.resetBackoff)
1291 ac.backoffIdx = 0
1292 ac.resetBackoff = make(chan struct{})
1293 ac.mu.Unlock()
1294}
1295
1296// getReadyTransport returns the transport if ac's state is READY.
1297// Otherwise it returns nil, false.
1298// If ac's state is IDLE, it will trigger ac to connect.
1299func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1300 ac.mu.Lock()
1301 if ac.state == connectivity.Ready && ac.transport != nil {
1302 t := ac.transport
1303 ac.mu.Unlock()
1304 return t, true
1305 }
1306 var idle bool
1307 if ac.state == connectivity.Idle {
1308 idle = true
1309 }
1310 ac.mu.Unlock()
1311 // Trigger idle ac to connect.
1312 if idle {
1313 ac.connect()
1314 }
1315 return nil, false
1316}
1317
1318// tearDown starts to tear down the addrConn.
1319// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1320// some edge cases (e.g., the caller opens and closes many addrConn's in a
1321// tight loop.
1322// tearDown doesn't remove ac from ac.cc.conns.
1323func (ac *addrConn) tearDown(err error) {
1324 ac.mu.Lock()
1325 if ac.state == connectivity.Shutdown {
1326 ac.mu.Unlock()
1327 return
1328 }
1329 curTr := ac.transport
1330 ac.transport = nil
1331 // We have to set the state to Shutdown before anything else to prevent races
1332 // between setting the state and logic that waits on context cancelation / etc.
1333 ac.updateConnectivityState(connectivity.Shutdown)
1334 ac.cancel()
1335 ac.tearDownErr = err
1336 ac.curAddr = resolver.Address{}
1337 if err == errConnDrain && curTr != nil {
1338 // GracefulClose(...) may be executed multiple times when
1339 // i) receiving multiple GoAway frames from the server; or
1340 // ii) there are concurrent name resolver/Balancer triggered
1341 // address removal and GoAway.
1342 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1343 ac.mu.Unlock()
1344 curTr.GracefulClose()
1345 ac.mu.Lock()
1346 }
1347 if channelz.IsOn() {
1348 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1349 Desc: "Subchannel Deleted",
1350 Severity: channelz.CtINFO,
1351 Parent: &channelz.TraceEventDesc{
1352 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1353 Severity: channelz.CtINFO,
1354 },
1355 })
1356 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1357 // the entity beng deleted, and thus prevent it from being deleted right away.
1358 channelz.RemoveEntry(ac.channelzID)
1359 }
1360 ac.mu.Unlock()
1361}
1362
1363func (ac *addrConn) getState() connectivity.State {
1364 ac.mu.Lock()
1365 defer ac.mu.Unlock()
1366 return ac.state
1367}
1368
1369func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1370 ac.mu.Lock()
1371 addr := ac.curAddr.Addr
1372 ac.mu.Unlock()
1373 return &channelz.ChannelInternalMetric{
1374 State: ac.getState(),
1375 Target: addr,
1376 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1377 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1378 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1379 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1380 }
1381}
1382
1383func (ac *addrConn) incrCallsStarted() {
1384 atomic.AddInt64(&ac.czData.callsStarted, 1)
1385 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1386}
1387
1388func (ac *addrConn) incrCallsSucceeded() {
1389 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1390}
1391
1392func (ac *addrConn) incrCallsFailed() {
1393 atomic.AddInt64(&ac.czData.callsFailed, 1)
1394}
1395
1396type retryThrottler struct {
1397 max float64
1398 thresh float64
1399 ratio float64
1400
1401 mu sync.Mutex
1402 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1403}
1404
1405// throttle subtracts a retry token from the pool and returns whether a retry
1406// should be throttled (disallowed) based upon the retry throttling policy in
1407// the service config.
1408func (rt *retryThrottler) throttle() bool {
1409 if rt == nil {
1410 return false
1411 }
1412 rt.mu.Lock()
1413 defer rt.mu.Unlock()
1414 rt.tokens--
1415 if rt.tokens < 0 {
1416 rt.tokens = 0
1417 }
1418 return rt.tokens <= rt.thresh
1419}
1420
1421func (rt *retryThrottler) successfulRPC() {
1422 if rt == nil {
1423 return
1424 }
1425 rt.mu.Lock()
1426 defer rt.mu.Unlock()
1427 rt.tokens += rt.ratio
1428 if rt.tokens > rt.max {
1429 rt.tokens = rt.max
1430 }
1431}
1432
1433type channelzChannel struct {
1434 cc *ClientConn
1435}
1436
1437func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1438 return c.cc.channelzMetric()
1439}
1440
1441// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1442// underlying connections within the specified timeout.
1443//
1444// Deprecated: This error is never returned by grpc and should not be
1445// referenced by users.
1446var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")