blob: 56d0bf713d73ea91c8c78b76fc3a1b6220a3e65e [file] [log] [blame]
William Kurkian6ea97f82019-03-13 15:51:55 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/envconfig"
42 "google.golang.org/grpc/internal/grpcsync"
43 "google.golang.org/grpc/internal/transport"
44 "google.golang.org/grpc/keepalive"
45 "google.golang.org/grpc/metadata"
46 "google.golang.org/grpc/resolver"
47 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
48 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
49 "google.golang.org/grpc/status"
50)
51
52const (
53 // minimum time to give a connection to complete
54 minConnectTimeout = 20 * time.Second
55 // must match grpclbName in grpclb/grpclb.go
56 grpclbName = "grpclb"
57)
58
59var (
60 // ErrClientConnClosing indicates that the operation is illegal because
61 // the ClientConn is closing.
62 //
63 // Deprecated: this error should not be relied upon by users; use the status
64 // code of Canceled instead.
65 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
66 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
67 errConnDrain = errors.New("grpc: the connection is drained")
68 // errConnClosing indicates that the connection is closing.
69 errConnClosing = errors.New("grpc: the connection is closing")
70 // errBalancerClosed indicates that the balancer is closed.
71 errBalancerClosed = errors.New("grpc: balancer is closed")
72 // We use an accessor so that minConnectTimeout can be
73 // atomically read and updated while testing.
74 getMinConnectTimeout = func() time.Duration {
75 return minConnectTimeout
76 }
77)
78
79// The following errors are returned from Dial and DialContext
80var (
81 // errNoTransportSecurity indicates that there is no transport security
82 // being set for ClientConn. Users should either set one or explicitly
83 // call WithInsecure DialOption to disable security.
84 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
85 // errTransportCredsAndBundle indicates that creds bundle is used together
86 // with other individual Transport Credentials.
87 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
88 // errTransportCredentialsMissing indicates that users want to transmit security
89 // information (e.g., oauth2 token) which requires secure connection on an insecure
90 // connection.
91 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
92 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
93 // and grpc.WithInsecure() are both called for a connection.
94 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
95)
96
97const (
98 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
99 defaultClientMaxSendMessageSize = math.MaxInt32
100 // http2IOBufSize specifies the buffer size for sending frames.
101 defaultWriteBufSize = 32 * 1024
102 defaultReadBufSize = 32 * 1024
103)
104
105// Dial creates a client connection to the given target.
106func Dial(target string, opts ...DialOption) (*ClientConn, error) {
107 return DialContext(context.Background(), target, opts...)
108}
109
110// DialContext creates a client connection to the given target. By default, it's
111// a non-blocking dial (the function won't wait for connections to be
112// established, and connecting happens in the background). To make it a blocking
113// dial, use WithBlock() dial option.
114//
115// In the non-blocking case, the ctx does not act against the connection. It
116// only controls the setup steps.
117//
118// In the blocking case, ctx can be used to cancel or expire the pending
119// connection. Once this function returns, the cancellation and expiration of
120// ctx will be noop. Users should call ClientConn.Close to terminate all the
121// pending operations after this function returns.
122//
123// The target name syntax is defined in
124// https://github.com/grpc/grpc/blob/master/doc/naming.md.
125// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
126func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
127 cc := &ClientConn{
128 target: target,
129 csMgr: &connectivityStateManager{},
130 conns: make(map[*addrConn]struct{}),
131 dopts: defaultDialOptions(),
132 blockingpicker: newPickerWrapper(),
133 czData: new(channelzData),
134 firstResolveEvent: grpcsync.NewEvent(),
135 }
136 cc.retryThrottler.Store((*retryThrottler)(nil))
137 cc.ctx, cc.cancel = context.WithCancel(context.Background())
138
139 for _, opt := range opts {
140 opt.apply(&cc.dopts)
141 }
142
143 if channelz.IsOn() {
144 if cc.dopts.channelzParentID != 0 {
145 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
146 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
147 Desc: "Channel Created",
148 Severity: channelz.CtINFO,
149 Parent: &channelz.TraceEventDesc{
150 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
151 Severity: channelz.CtINFO,
152 },
153 })
154 } else {
155 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
156 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
157 Desc: "Channel Created",
158 Severity: channelz.CtINFO,
159 })
160 }
161 cc.csMgr.channelzID = cc.channelzID
162 }
163
164 if !cc.dopts.insecure {
165 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
166 return nil, errNoTransportSecurity
167 }
168 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
169 return nil, errTransportCredsAndBundle
170 }
171 } else {
172 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
173 return nil, errCredentialsConflict
174 }
175 for _, cd := range cc.dopts.copts.PerRPCCredentials {
176 if cd.RequireTransportSecurity() {
177 return nil, errTransportCredentialsMissing
178 }
179 }
180 }
181
182 cc.mkp = cc.dopts.copts.KeepaliveParams
183
184 if cc.dopts.copts.Dialer == nil {
185 cc.dopts.copts.Dialer = newProxyDialer(
186 func(ctx context.Context, addr string) (net.Conn, error) {
187 network, addr := parseDialTarget(addr)
188 return (&net.Dialer{}).DialContext(ctx, network, addr)
189 },
190 )
191 }
192
193 if cc.dopts.copts.UserAgent != "" {
194 cc.dopts.copts.UserAgent += " " + grpcUA
195 } else {
196 cc.dopts.copts.UserAgent = grpcUA
197 }
198
199 if cc.dopts.timeout > 0 {
200 var cancel context.CancelFunc
201 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
202 defer cancel()
203 }
204
205 defer func() {
206 select {
207 case <-ctx.Done():
208 conn, err = nil, ctx.Err()
209 default:
210 }
211
212 if err != nil {
213 cc.Close()
214 }
215 }()
216
217 scSet := false
218 if cc.dopts.scChan != nil {
219 // Try to get an initial service config.
220 select {
221 case sc, ok := <-cc.dopts.scChan:
222 if ok {
223 cc.sc = sc
224 scSet = true
225 }
226 default:
227 }
228 }
229 if cc.dopts.bs == nil {
230 cc.dopts.bs = backoff.Exponential{
231 MaxDelay: DefaultBackoffConfig.MaxDelay,
232 }
233 }
234 if cc.dopts.resolverBuilder == nil {
235 // Only try to parse target when resolver builder is not already set.
236 cc.parsedTarget = parseTarget(cc.target)
237 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
238 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
239 if cc.dopts.resolverBuilder == nil {
240 // If resolver builder is still nil, the parse target's scheme is
241 // not registered. Fallback to default resolver and set Endpoint to
242 // the original unparsed target.
243 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
244 cc.parsedTarget = resolver.Target{
245 Scheme: resolver.GetDefaultScheme(),
246 Endpoint: target,
247 }
248 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
249 }
250 } else {
251 cc.parsedTarget = resolver.Target{Endpoint: target}
252 }
253 creds := cc.dopts.copts.TransportCredentials
254 if creds != nil && creds.Info().ServerName != "" {
255 cc.authority = creds.Info().ServerName
256 } else if cc.dopts.insecure && cc.dopts.authority != "" {
257 cc.authority = cc.dopts.authority
258 } else {
259 // Use endpoint from "scheme://authority/endpoint" as the default
260 // authority for ClientConn.
261 cc.authority = cc.parsedTarget.Endpoint
262 }
263
264 if cc.dopts.scChan != nil && !scSet {
265 // Blocking wait for the initial service config.
266 select {
267 case sc, ok := <-cc.dopts.scChan:
268 if ok {
269 cc.sc = sc
270 }
271 case <-ctx.Done():
272 return nil, ctx.Err()
273 }
274 }
275 if cc.dopts.scChan != nil {
276 go cc.scWatcher()
277 }
278
279 var credsClone credentials.TransportCredentials
280 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
281 credsClone = creds.Clone()
282 }
283 cc.balancerBuildOpts = balancer.BuildOptions{
284 DialCreds: credsClone,
285 CredsBundle: cc.dopts.copts.CredsBundle,
286 Dialer: cc.dopts.copts.Dialer,
287 ChannelzParentID: cc.channelzID,
288 }
289
290 // Build the resolver.
291 rWrapper, err := newCCResolverWrapper(cc)
292 if err != nil {
293 return nil, fmt.Errorf("failed to build resolver: %v", err)
294 }
295
296 cc.mu.Lock()
297 cc.resolverWrapper = rWrapper
298 cc.mu.Unlock()
299 // A blocking dial blocks until the clientConn is ready.
300 if cc.dopts.block {
301 for {
302 s := cc.GetState()
303 if s == connectivity.Ready {
304 break
305 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
306 if err = cc.blockingpicker.connectionError(); err != nil {
307 terr, ok := err.(interface {
308 Temporary() bool
309 })
310 if ok && !terr.Temporary() {
311 return nil, err
312 }
313 }
314 }
315 if !cc.WaitForStateChange(ctx, s) {
316 // ctx got timeout or canceled.
317 return nil, ctx.Err()
318 }
319 }
320 }
321
322 return cc, nil
323}
324
325// connectivityStateManager keeps the connectivity.State of ClientConn.
326// This struct will eventually be exported so the balancers can access it.
327type connectivityStateManager struct {
328 mu sync.Mutex
329 state connectivity.State
330 notifyChan chan struct{}
331 channelzID int64
332}
333
334// updateState updates the connectivity.State of ClientConn.
335// If there's a change it notifies goroutines waiting on state change to
336// happen.
337func (csm *connectivityStateManager) updateState(state connectivity.State) {
338 csm.mu.Lock()
339 defer csm.mu.Unlock()
340 if csm.state == connectivity.Shutdown {
341 return
342 }
343 if csm.state == state {
344 return
345 }
346 csm.state = state
347 if channelz.IsOn() {
348 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
349 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
350 Severity: channelz.CtINFO,
351 })
352 }
353 if csm.notifyChan != nil {
354 // There are other goroutines waiting on this channel.
355 close(csm.notifyChan)
356 csm.notifyChan = nil
357 }
358}
359
360func (csm *connectivityStateManager) getState() connectivity.State {
361 csm.mu.Lock()
362 defer csm.mu.Unlock()
363 return csm.state
364}
365
366func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
367 csm.mu.Lock()
368 defer csm.mu.Unlock()
369 if csm.notifyChan == nil {
370 csm.notifyChan = make(chan struct{})
371 }
372 return csm.notifyChan
373}
374
375// ClientConn represents a client connection to an RPC server.
376type ClientConn struct {
377 ctx context.Context
378 cancel context.CancelFunc
379
380 target string
381 parsedTarget resolver.Target
382 authority string
383 dopts dialOptions
384 csMgr *connectivityStateManager
385
386 balancerBuildOpts balancer.BuildOptions
387 blockingpicker *pickerWrapper
388
389 mu sync.RWMutex
390 resolverWrapper *ccResolverWrapper
391 sc ServiceConfig
392 scRaw string
393 conns map[*addrConn]struct{}
394 // Keepalive parameter can be updated if a GoAway is received.
395 mkp keepalive.ClientParameters
396 curBalancerName string
397 preBalancerName string // previous balancer name.
398 curAddresses []resolver.Address
399 balancerWrapper *ccBalancerWrapper
400 retryThrottler atomic.Value
401
402 firstResolveEvent *grpcsync.Event
403
404 channelzID int64 // channelz unique identification number
405 czData *channelzData
406}
407
408// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
409// ctx expires. A true value is returned in former case and false in latter.
410// This is an EXPERIMENTAL API.
411func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
412 ch := cc.csMgr.getNotifyChan()
413 if cc.csMgr.getState() != sourceState {
414 return true
415 }
416 select {
417 case <-ctx.Done():
418 return false
419 case <-ch:
420 return true
421 }
422}
423
424// GetState returns the connectivity.State of ClientConn.
425// This is an EXPERIMENTAL API.
426func (cc *ClientConn) GetState() connectivity.State {
427 return cc.csMgr.getState()
428}
429
430func (cc *ClientConn) scWatcher() {
431 for {
432 select {
433 case sc, ok := <-cc.dopts.scChan:
434 if !ok {
435 return
436 }
437 cc.mu.Lock()
438 // TODO: load balance policy runtime change is ignored.
439 // We may revist this decision in the future.
440 cc.sc = sc
441 cc.scRaw = ""
442 cc.mu.Unlock()
443 case <-cc.ctx.Done():
444 return
445 }
446 }
447}
448
449// waitForResolvedAddrs blocks until the resolver has provided addresses or the
450// context expires. Returns nil unless the context expires first; otherwise
451// returns a status error based on the context.
452func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
453 // This is on the RPC path, so we use a fast path to avoid the
454 // more-expensive "select" below after the resolver has returned once.
455 if cc.firstResolveEvent.HasFired() {
456 return nil
457 }
458 select {
459 case <-cc.firstResolveEvent.Done():
460 return nil
461 case <-ctx.Done():
462 return status.FromContextError(ctx.Err()).Err()
463 case <-cc.ctx.Done():
464 return ErrClientConnClosing
465 }
466}
467
468func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
469 cc.mu.Lock()
470 defer cc.mu.Unlock()
471 if cc.conns == nil {
472 // cc was closed.
473 return
474 }
475
476 if reflect.DeepEqual(cc.curAddresses, addrs) {
477 return
478 }
479
480 cc.curAddresses = addrs
481 cc.firstResolveEvent.Fire()
482
483 if cc.dopts.balancerBuilder == nil {
484 // Only look at balancer types and switch balancer if balancer dial
485 // option is not set.
486 var isGRPCLB bool
487 for _, a := range addrs {
488 if a.Type == resolver.GRPCLB {
489 isGRPCLB = true
490 break
491 }
492 }
493 var newBalancerName string
494 if isGRPCLB {
495 newBalancerName = grpclbName
496 } else {
497 // Address list doesn't contain grpclb address. Try to pick a
498 // non-grpclb balancer.
499 newBalancerName = cc.curBalancerName
500 // If current balancer is grpclb, switch to the previous one.
501 if newBalancerName == grpclbName {
502 newBalancerName = cc.preBalancerName
503 }
504 // The following could be true in two cases:
505 // - the first time handling resolved addresses
506 // (curBalancerName="")
507 // - the first time handling non-grpclb addresses
508 // (curBalancerName="grpclb", preBalancerName="")
509 if newBalancerName == "" {
510 newBalancerName = PickFirstBalancerName
511 }
512 }
513 cc.switchBalancer(newBalancerName)
514 } else if cc.balancerWrapper == nil {
515 // Balancer dial option was set, and this is the first time handling
516 // resolved addresses. Build a balancer with dopts.balancerBuilder.
517 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
518 }
519
520 cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
521}
522
523// switchBalancer starts the switching from current balancer to the balancer
524// with the given name.
525//
526// It will NOT send the current address list to the new balancer. If needed,
527// caller of this function should send address list to the new balancer after
528// this function returns.
529//
530// Caller must hold cc.mu.
531func (cc *ClientConn) switchBalancer(name string) {
532 if cc.conns == nil {
533 return
534 }
535
536 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
537 return
538 }
539
540 grpclog.Infof("ClientConn switching balancer to %q", name)
541 if cc.dopts.balancerBuilder != nil {
542 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
543 return
544 }
545 // TODO(bar switching) change this to two steps: drain and close.
546 // Keep track of sc in wrapper.
547 if cc.balancerWrapper != nil {
548 cc.balancerWrapper.close()
549 }
550
551 builder := balancer.Get(name)
552 // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
553 // we reuse previous one?
554 if channelz.IsOn() {
555 if builder == nil {
556 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
557 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
558 Severity: channelz.CtWarning,
559 })
560 } else {
561 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
562 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
563 Severity: channelz.CtINFO,
564 })
565 }
566 }
567 if builder == nil {
568 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
569 builder = newPickfirstBuilder()
570 }
571
572 cc.preBalancerName = cc.curBalancerName
573 cc.curBalancerName = builder.Name()
574 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
575}
576
577func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
578 cc.mu.Lock()
579 if cc.conns == nil {
580 cc.mu.Unlock()
581 return
582 }
583 // TODO(bar switching) send updates to all balancer wrappers when balancer
584 // gracefully switching is supported.
585 cc.balancerWrapper.handleSubConnStateChange(sc, s)
586 cc.mu.Unlock()
587}
588
589// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
590//
591// Caller needs to make sure len(addrs) > 0.
592func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
593 ac := &addrConn{
594 cc: cc,
595 addrs: addrs,
596 scopts: opts,
597 dopts: cc.dopts,
598 czData: new(channelzData),
599 resetBackoff: make(chan struct{}),
600 }
601 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
602 // Track ac in cc. This needs to be done before any getTransport(...) is called.
603 cc.mu.Lock()
604 if cc.conns == nil {
605 cc.mu.Unlock()
606 return nil, ErrClientConnClosing
607 }
608 if channelz.IsOn() {
609 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
610 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
611 Desc: "Subchannel Created",
612 Severity: channelz.CtINFO,
613 Parent: &channelz.TraceEventDesc{
614 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
615 Severity: channelz.CtINFO,
616 },
617 })
618 }
619 cc.conns[ac] = struct{}{}
620 cc.mu.Unlock()
621 return ac, nil
622}
623
624// removeAddrConn removes the addrConn in the subConn from clientConn.
625// It also tears down the ac with the given error.
626func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
627 cc.mu.Lock()
628 if cc.conns == nil {
629 cc.mu.Unlock()
630 return
631 }
632 delete(cc.conns, ac)
633 cc.mu.Unlock()
634 ac.tearDown(err)
635}
636
637func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
638 return &channelz.ChannelInternalMetric{
639 State: cc.GetState(),
640 Target: cc.target,
641 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
642 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
643 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
644 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
645 }
646}
647
648// Target returns the target string of the ClientConn.
649// This is an EXPERIMENTAL API.
650func (cc *ClientConn) Target() string {
651 return cc.target
652}
653
654func (cc *ClientConn) incrCallsStarted() {
655 atomic.AddInt64(&cc.czData.callsStarted, 1)
656 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
657}
658
659func (cc *ClientConn) incrCallsSucceeded() {
660 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
661}
662
663func (cc *ClientConn) incrCallsFailed() {
664 atomic.AddInt64(&cc.czData.callsFailed, 1)
665}
666
667// connect starts creating a transport.
668// It does nothing if the ac is not IDLE.
669// TODO(bar) Move this to the addrConn section.
670func (ac *addrConn) connect() error {
671 ac.mu.Lock()
672 if ac.state == connectivity.Shutdown {
673 ac.mu.Unlock()
674 return errConnClosing
675 }
676 if ac.state != connectivity.Idle {
677 ac.mu.Unlock()
678 return nil
679 }
680 ac.updateConnectivityState(connectivity.Connecting)
681 ac.mu.Unlock()
682
683 // Start a goroutine connecting to the server asynchronously.
684 go ac.resetTransport()
685 return nil
686}
687
688// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
689//
690// It checks whether current connected address of ac is in the new addrs list.
691// - If true, it updates ac.addrs and returns true. The ac will keep using
692// the existing connection.
693// - If false, it does nothing and returns false.
694func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
695 ac.mu.Lock()
696 defer ac.mu.Unlock()
697 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
698 if ac.state == connectivity.Shutdown {
699 ac.addrs = addrs
700 return true
701 }
702
703 // Unless we're busy reconnecting already, let's reconnect from the top of
704 // the list.
705 if ac.state != connectivity.Ready {
706 return false
707 }
708
709 var curAddrFound bool
710 for _, a := range addrs {
711 if reflect.DeepEqual(ac.curAddr, a) {
712 curAddrFound = true
713 break
714 }
715 }
716 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
717 if curAddrFound {
718 ac.addrs = addrs
719 }
720
721 return curAddrFound
722}
723
724// GetMethodConfig gets the method config of the input method.
725// If there's an exact match for input method (i.e. /service/method), we return
726// the corresponding MethodConfig.
727// If there isn't an exact match for the input method, we look for the default config
728// under the service (i.e /service/). If there is a default MethodConfig for
729// the service, we return it.
730// Otherwise, we return an empty MethodConfig.
731func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
732 // TODO: Avoid the locking here.
733 cc.mu.RLock()
734 defer cc.mu.RUnlock()
735 m, ok := cc.sc.Methods[method]
736 if !ok {
737 i := strings.LastIndex(method, "/")
738 m = cc.sc.Methods[method[:i+1]]
739 }
740 return m
741}
742
743func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
744 cc.mu.RLock()
745 defer cc.mu.RUnlock()
746 return cc.sc.healthCheckConfig
747}
748
749func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
750 hdr, _ := metadata.FromOutgoingContext(ctx)
751 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
752 FullMethodName: method,
753 Header: hdr,
754 })
755 if err != nil {
756 return nil, nil, toRPCErr(err)
757 }
758 return t, done, nil
759}
760
761// handleServiceConfig parses the service config string in JSON format to Go native
762// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
763func (cc *ClientConn) handleServiceConfig(js string) error {
764 if cc.dopts.disableServiceConfig {
765 return nil
766 }
767 if cc.scRaw == js {
768 return nil
769 }
770 if channelz.IsOn() {
771 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
772 // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
773 // for human consumption.
774 Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
775 Severity: channelz.CtINFO,
776 })
777 }
778 sc, err := parseServiceConfig(js)
779 if err != nil {
780 return err
781 }
782 cc.mu.Lock()
783 // Check if the ClientConn is already closed. Some fields (e.g.
784 // balancerWrapper) are set to nil when closing the ClientConn, and could
785 // cause nil pointer panic if we don't have this check.
786 if cc.conns == nil {
787 cc.mu.Unlock()
788 return nil
789 }
790 cc.scRaw = js
791 cc.sc = sc
792
793 if sc.retryThrottling != nil {
794 newThrottler := &retryThrottler{
795 tokens: sc.retryThrottling.MaxTokens,
796 max: sc.retryThrottling.MaxTokens,
797 thresh: sc.retryThrottling.MaxTokens / 2,
798 ratio: sc.retryThrottling.TokenRatio,
799 }
800 cc.retryThrottler.Store(newThrottler)
801 } else {
802 cc.retryThrottler.Store((*retryThrottler)(nil))
803 }
804
805 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
806 if cc.curBalancerName == grpclbName {
807 // If current balancer is grpclb, there's at least one grpclb
808 // balancer address in the resolved list. Don't switch the balancer,
809 // but change the previous balancer name, so if a new resolved
810 // address list doesn't contain grpclb address, balancer will be
811 // switched to *sc.LB.
812 cc.preBalancerName = *sc.LB
813 } else {
814 cc.switchBalancer(*sc.LB)
815 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
816 }
817 }
818
819 cc.mu.Unlock()
820 return nil
821}
822
823func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
824 cc.mu.RLock()
825 r := cc.resolverWrapper
826 cc.mu.RUnlock()
827 if r == nil {
828 return
829 }
830 go r.resolveNow(o)
831}
832
833// ResetConnectBackoff wakes up all subchannels in transient failure and causes
834// them to attempt another connection immediately. It also resets the backoff
835// times used for subsequent attempts regardless of the current state.
836//
837// In general, this function should not be used. Typical service or network
838// outages result in a reasonable client reconnection strategy by default.
839// However, if a previously unavailable network becomes available, this may be
840// used to trigger an immediate reconnect.
841//
842// This API is EXPERIMENTAL.
843func (cc *ClientConn) ResetConnectBackoff() {
844 cc.mu.Lock()
845 defer cc.mu.Unlock()
846 for ac := range cc.conns {
847 ac.resetConnectBackoff()
848 }
849}
850
851// Close tears down the ClientConn and all underlying connections.
852func (cc *ClientConn) Close() error {
853 defer cc.cancel()
854
855 cc.mu.Lock()
856 if cc.conns == nil {
857 cc.mu.Unlock()
858 return ErrClientConnClosing
859 }
860 conns := cc.conns
861 cc.conns = nil
862 cc.csMgr.updateState(connectivity.Shutdown)
863
864 rWrapper := cc.resolverWrapper
865 cc.resolverWrapper = nil
866 bWrapper := cc.balancerWrapper
867 cc.balancerWrapper = nil
868 cc.mu.Unlock()
869
870 cc.blockingpicker.close()
871
872 if rWrapper != nil {
873 rWrapper.close()
874 }
875 if bWrapper != nil {
876 bWrapper.close()
877 }
878
879 for ac := range conns {
880 ac.tearDown(ErrClientConnClosing)
881 }
882 if channelz.IsOn() {
883 ted := &channelz.TraceEventDesc{
884 Desc: "Channel Deleted",
885 Severity: channelz.CtINFO,
886 }
887 if cc.dopts.channelzParentID != 0 {
888 ted.Parent = &channelz.TraceEventDesc{
889 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
890 Severity: channelz.CtINFO,
891 }
892 }
893 channelz.AddTraceEvent(cc.channelzID, ted)
894 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
895 // the entity beng deleted, and thus prevent it from being deleted right away.
896 channelz.RemoveEntry(cc.channelzID)
897 }
898 return nil
899}
900
901// addrConn is a network connection to a given address.
902type addrConn struct {
903 ctx context.Context
904 cancel context.CancelFunc
905
906 cc *ClientConn
907 dopts dialOptions
908 acbw balancer.SubConn
909 scopts balancer.NewSubConnOptions
910
911 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
912 // health checking may require server to report healthy to set ac to READY), and is reset
913 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
914 // is received, transport is closed, ac has been torn down).
915 transport transport.ClientTransport // The current transport.
916
917 mu sync.Mutex
918 curAddr resolver.Address // The current address.
919 addrs []resolver.Address // All addresses that the resolver resolved to.
920
921 // Use updateConnectivityState for updating addrConn's connectivity state.
922 state connectivity.State
923
924 tearDownErr error // The reason this addrConn is torn down.
925
926 backoffIdx int // Needs to be stateful for resetConnectBackoff.
927 resetBackoff chan struct{}
928
929 channelzID int64 // channelz unique identification number.
930 czData *channelzData
931 healthCheckEnabled bool
932}
933
934// Note: this requires a lock on ac.mu.
935func (ac *addrConn) updateConnectivityState(s connectivity.State) {
936 if ac.state == s {
937 return
938 }
939
940 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
941 grpclog.Infof(updateMsg)
942 ac.state = s
943 if channelz.IsOn() {
944 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
945 Desc: updateMsg,
946 Severity: channelz.CtINFO,
947 })
948 }
949 ac.cc.handleSubConnStateChange(ac.acbw, s)
950}
951
952// adjustParams updates parameters used to create transports upon
953// receiving a GoAway.
954func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
955 switch r {
956 case transport.GoAwayTooManyPings:
957 v := 2 * ac.dopts.copts.KeepaliveParams.Time
958 ac.cc.mu.Lock()
959 if v > ac.cc.mkp.Time {
960 ac.cc.mkp.Time = v
961 }
962 ac.cc.mu.Unlock()
963 }
964}
965
966func (ac *addrConn) resetTransport() {
967 for i := 0; ; i++ {
968 tryNextAddrFromStart := grpcsync.NewEvent()
969
970 ac.mu.Lock()
971 if i > 0 {
972 ac.cc.resolveNow(resolver.ResolveNowOption{})
973 }
974 addrs := ac.addrs
975 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
976 ac.mu.Unlock()
977
978 addrLoop:
979 for _, addr := range addrs {
980 ac.mu.Lock()
981
982 if ac.state == connectivity.Shutdown {
983 ac.mu.Unlock()
984 return
985 }
986 ac.updateConnectivityState(connectivity.Connecting)
987 ac.transport = nil
988 ac.mu.Unlock()
989
990 // This will be the duration that dial gets to finish.
991 dialDuration := getMinConnectTimeout()
992 if dialDuration < backoffFor {
993 // Give dial more time as we keep failing to connect.
994 dialDuration = backoffFor
995 }
996 connectDeadline := time.Now().Add(dialDuration)
997
998 ac.mu.Lock()
999 ac.cc.mu.RLock()
1000 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1001 ac.cc.mu.RUnlock()
1002
1003 if ac.state == connectivity.Shutdown {
1004 ac.mu.Unlock()
1005 return
1006 }
1007
1008 copts := ac.dopts.copts
1009 if ac.scopts.CredsBundle != nil {
1010 copts.CredsBundle = ac.scopts.CredsBundle
1011 }
1012 hctx, hcancel := context.WithCancel(ac.ctx)
1013 defer hcancel()
1014 ac.mu.Unlock()
1015
1016 if channelz.IsOn() {
1017 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1018 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1019 Severity: channelz.CtINFO,
1020 })
1021 }
1022
1023 reconnect := grpcsync.NewEvent()
1024 prefaceReceived := make(chan struct{})
1025 newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
1026 if err == nil {
1027 ac.mu.Lock()
1028 ac.curAddr = addr
1029 ac.transport = newTr
1030 ac.mu.Unlock()
1031
1032 healthCheckConfig := ac.cc.healthCheckConfig()
1033 // LB channel health checking is only enabled when all the four requirements below are met:
1034 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1035 // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1036 // 3. a service config with non-empty healthCheckConfig field is provided,
1037 // 4. the current load balancer allows it.
1038 healthcheckManagingState := false
1039 if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1040 if ac.cc.dopts.healthCheckFunc == nil {
1041 // TODO: add a link to the health check doc in the error message.
1042 grpclog.Error("the client side LB channel health check function has not been set.")
1043 } else {
1044 // TODO(deklerk) refactor to just return transport
1045 go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1046 healthcheckManagingState = true
1047 }
1048 }
1049 if !healthcheckManagingState {
1050 ac.mu.Lock()
1051 ac.updateConnectivityState(connectivity.Ready)
1052 ac.mu.Unlock()
1053 }
1054 } else {
1055 hcancel()
1056 if err == errConnClosing {
1057 return
1058 }
1059
1060 if tryNextAddrFromStart.HasFired() {
1061 break addrLoop
1062 }
1063 continue
1064 }
1065
1066 ac.mu.Lock()
1067 reqHandshake := ac.dopts.reqHandshake
1068 ac.mu.Unlock()
1069
1070 <-reconnect.Done()
1071 hcancel()
1072
1073 if reqHandshake == envconfig.RequireHandshakeHybrid {
1074 // In RequireHandshakeHybrid mode, we must check to see whether
1075 // server preface has arrived yet to decide whether to start
1076 // reconnecting at the top of the list (server preface received)
1077 // or continue with the next addr in the list as if the
1078 // connection were not successful (server preface not received).
1079 select {
1080 case <-prefaceReceived:
1081 // We received a server preface - huzzah! We consider this
1082 // a success and restart from the top of the addr list.
1083 ac.mu.Lock()
1084 ac.backoffIdx = 0
1085 ac.mu.Unlock()
1086 break addrLoop
1087 default:
1088 // Despite having set state to READY, in hybrid mode we
1089 // consider this a failure and continue connecting at the
1090 // next addr in the list.
1091 ac.mu.Lock()
1092 if ac.state == connectivity.Shutdown {
1093 ac.mu.Unlock()
1094 return
1095 }
1096
1097 ac.updateConnectivityState(connectivity.TransientFailure)
1098 ac.mu.Unlock()
1099
1100 if tryNextAddrFromStart.HasFired() {
1101 break addrLoop
1102 }
1103 }
1104 } else {
1105 // In RequireHandshakeOn mode, we would have already waited for
1106 // the server preface, so we consider this a success and restart
1107 // from the top of the addr list. In RequireHandshakeOff mode,
1108 // we don't care to wait for the server preface before
1109 // considering this a success, so we also restart from the top
1110 // of the addr list.
1111 ac.mu.Lock()
1112 ac.backoffIdx = 0
1113 ac.mu.Unlock()
1114 break addrLoop
1115 }
1116 }
1117
1118 // After exhausting all addresses, or after need to reconnect after a
1119 // READY, the addrConn enters TRANSIENT_FAILURE.
1120 ac.mu.Lock()
1121 if ac.state == connectivity.Shutdown {
1122 ac.mu.Unlock()
1123 return
1124 }
1125 ac.updateConnectivityState(connectivity.TransientFailure)
1126
1127 // Backoff.
1128 b := ac.resetBackoff
1129 timer := time.NewTimer(backoffFor)
1130 acctx := ac.ctx
1131 ac.mu.Unlock()
1132
1133 select {
1134 case <-timer.C:
1135 ac.mu.Lock()
1136 ac.backoffIdx++
1137 ac.mu.Unlock()
1138 case <-b:
1139 timer.Stop()
1140 case <-acctx.Done():
1141 timer.Stop()
1142 return
1143 }
1144 }
1145}
1146
1147// createTransport creates a connection to one of the backends in addrs. It
1148// sets ac.transport in the success case, or it returns an error if it was
1149// unable to successfully create a transport.
1150//
1151// If waitForHandshake is enabled, it blocks until server preface arrives.
1152func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
1153 onCloseCalled := make(chan struct{})
1154
1155 target := transport.TargetInfo{
1156 Addr: addr.Addr,
1157 Metadata: addr.Metadata,
1158 Authority: ac.cc.authority,
1159 }
1160
1161 prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
1162
1163 onGoAway := func(r transport.GoAwayReason) {
1164 ac.mu.Lock()
1165 ac.adjustParams(r)
1166 ac.mu.Unlock()
1167 reconnect.Fire()
1168 }
1169
1170 onClose := func() {
1171 close(onCloseCalled)
1172 prefaceTimer.Stop()
1173 reconnect.Fire()
1174 }
1175
1176 onPrefaceReceipt := func() {
1177 close(prefaceReceived)
1178 prefaceTimer.Stop()
1179 }
1180
1181 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1182 defer cancel()
1183 if channelz.IsOn() {
1184 copts.ChannelzParentID = ac.channelzID
1185 }
1186
1187 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1188
1189 if err == nil {
1190 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1191 select {
1192 case <-prefaceTimer.C:
1193 // We didn't get the preface in time.
1194 newTr.Close()
1195 err = errors.New("timed out waiting for server handshake")
1196 case <-prefaceReceived:
1197 // We got the preface - huzzah! things are good.
1198 case <-onCloseCalled:
1199 // The transport has already closed - noop.
1200 return nil, errors.New("connection closed")
1201 }
1202 } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1203 go func() {
1204 select {
1205 case <-prefaceTimer.C:
1206 // We didn't get the preface in time.
1207 newTr.Close()
1208 case <-prefaceReceived:
1209 // We got the preface just in the nick of time - huzzah!
1210 case <-onCloseCalled:
1211 // The transport has already closed - noop.
1212 }
1213 }()
1214 }
1215 }
1216
1217 if err != nil {
1218 // newTr is either nil, or closed.
1219 ac.cc.blockingpicker.updateConnectionError(err)
1220 ac.mu.Lock()
1221 if ac.state == connectivity.Shutdown {
1222 // ac.tearDown(...) has been invoked.
1223 ac.mu.Unlock()
1224
1225 return nil, errConnClosing
1226 }
1227 ac.mu.Unlock()
1228 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1229 return nil, err
1230 }
1231
1232 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1233 ac.mu.Lock()
1234 if ac.state == connectivity.Shutdown {
1235 ac.mu.Unlock()
1236 newTr.Close()
1237 return nil, errConnClosing
1238 }
1239 ac.mu.Unlock()
1240
1241 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1242 ac.mu.Lock()
1243 if ac.state == connectivity.Shutdown {
1244 ac.mu.Unlock()
1245 newTr.Close()
1246 return nil, errConnClosing
1247 }
1248 ac.mu.Unlock()
1249
1250 return newTr, nil
1251}
1252
1253func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
1254 // Set up the health check helper functions
1255 newStream := func() (interface{}, error) {
1256 return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
1257 }
1258 firstReady := true
1259 reportHealth := func(ok bool) {
1260 ac.mu.Lock()
1261 defer ac.mu.Unlock()
1262 if ac.transport != newTr {
1263 return
1264 }
1265 if ok {
1266 if firstReady {
1267 firstReady = false
1268 ac.curAddr = addr
1269 }
1270 ac.updateConnectivityState(connectivity.Ready)
1271 } else {
1272 ac.updateConnectivityState(connectivity.TransientFailure)
1273 }
1274 }
1275 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
1276 if err != nil {
1277 if status.Code(err) == codes.Unimplemented {
1278 if channelz.IsOn() {
1279 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1280 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1281 Severity: channelz.CtError,
1282 })
1283 }
1284 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1285 } else {
1286 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1287 }
1288 }
1289}
1290
1291func (ac *addrConn) resetConnectBackoff() {
1292 ac.mu.Lock()
1293 close(ac.resetBackoff)
1294 ac.backoffIdx = 0
1295 ac.resetBackoff = make(chan struct{})
1296 ac.mu.Unlock()
1297}
1298
1299// getReadyTransport returns the transport if ac's state is READY.
1300// Otherwise it returns nil, false.
1301// If ac's state is IDLE, it will trigger ac to connect.
1302func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1303 ac.mu.Lock()
1304 if ac.state == connectivity.Ready && ac.transport != nil {
1305 t := ac.transport
1306 ac.mu.Unlock()
1307 return t, true
1308 }
1309 var idle bool
1310 if ac.state == connectivity.Idle {
1311 idle = true
1312 }
1313 ac.mu.Unlock()
1314 // Trigger idle ac to connect.
1315 if idle {
1316 ac.connect()
1317 }
1318 return nil, false
1319}
1320
1321// tearDown starts to tear down the addrConn.
1322// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1323// some edge cases (e.g., the caller opens and closes many addrConn's in a
1324// tight loop.
1325// tearDown doesn't remove ac from ac.cc.conns.
1326func (ac *addrConn) tearDown(err error) {
1327 ac.mu.Lock()
1328 if ac.state == connectivity.Shutdown {
1329 ac.mu.Unlock()
1330 return
1331 }
1332 curTr := ac.transport
1333 ac.transport = nil
1334 // We have to set the state to Shutdown before anything else to prevent races
1335 // between setting the state and logic that waits on context cancelation / etc.
1336 ac.updateConnectivityState(connectivity.Shutdown)
1337 ac.cancel()
1338 ac.tearDownErr = err
1339 ac.curAddr = resolver.Address{}
1340 if err == errConnDrain && curTr != nil {
1341 // GracefulClose(...) may be executed multiple times when
1342 // i) receiving multiple GoAway frames from the server; or
1343 // ii) there are concurrent name resolver/Balancer triggered
1344 // address removal and GoAway.
1345 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1346 ac.mu.Unlock()
1347 curTr.GracefulClose()
1348 ac.mu.Lock()
1349 }
1350 if channelz.IsOn() {
1351 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1352 Desc: "Subchannel Deleted",
1353 Severity: channelz.CtINFO,
1354 Parent: &channelz.TraceEventDesc{
1355 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1356 Severity: channelz.CtINFO,
1357 },
1358 })
1359 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1360 // the entity beng deleted, and thus prevent it from being deleted right away.
1361 channelz.RemoveEntry(ac.channelzID)
1362 }
1363 ac.mu.Unlock()
1364}
1365
1366func (ac *addrConn) getState() connectivity.State {
1367 ac.mu.Lock()
1368 defer ac.mu.Unlock()
1369 return ac.state
1370}
1371
1372func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1373 ac.mu.Lock()
1374 addr := ac.curAddr.Addr
1375 ac.mu.Unlock()
1376 return &channelz.ChannelInternalMetric{
1377 State: ac.getState(),
1378 Target: addr,
1379 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1380 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1381 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1382 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1383 }
1384}
1385
1386func (ac *addrConn) incrCallsStarted() {
1387 atomic.AddInt64(&ac.czData.callsStarted, 1)
1388 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1389}
1390
1391func (ac *addrConn) incrCallsSucceeded() {
1392 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1393}
1394
1395func (ac *addrConn) incrCallsFailed() {
1396 atomic.AddInt64(&ac.czData.callsFailed, 1)
1397}
1398
1399type retryThrottler struct {
1400 max float64
1401 thresh float64
1402 ratio float64
1403
1404 mu sync.Mutex
1405 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1406}
1407
1408// throttle subtracts a retry token from the pool and returns whether a retry
1409// should be throttled (disallowed) based upon the retry throttling policy in
1410// the service config.
1411func (rt *retryThrottler) throttle() bool {
1412 if rt == nil {
1413 return false
1414 }
1415 rt.mu.Lock()
1416 defer rt.mu.Unlock()
1417 rt.tokens--
1418 if rt.tokens < 0 {
1419 rt.tokens = 0
1420 }
1421 return rt.tokens <= rt.thresh
1422}
1423
1424func (rt *retryThrottler) successfulRPC() {
1425 if rt == nil {
1426 return
1427 }
1428 rt.mu.Lock()
1429 defer rt.mu.Unlock()
1430 rt.tokens += rt.ratio
1431 if rt.tokens > rt.max {
1432 rt.tokens = rt.max
1433 }
1434}
1435
1436type channelzChannel struct {
1437 cc *ClientConn
1438}
1439
1440func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1441 return c.cc.channelzMetric()
1442}
1443
1444// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1445// underlying connections within the specified timeout.
1446//
1447// Deprecated: This error is never returned by grpc and should not be
1448// referenced by users.
1449var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")