blob: 14ce9c76aa37db905d70d5179579e5887421fedb [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/grpcsync"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/serviceconfig"
46 "google.golang.org/grpc/status"
47
48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54 // minimum time to give a connection to complete
55 minConnectTimeout = 20 * time.Second
56 // must match grpclbName in grpclb/grpclb.go
57 grpclbName = "grpclb"
58)
59
60var (
61 // ErrClientConnClosing indicates that the operation is illegal because
62 // the ClientConn is closing.
63 //
64 // Deprecated: this error should not be relied upon by users; use the status
65 // code of Canceled instead.
66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68 errConnDrain = errors.New("grpc: the connection is drained")
69 // errConnClosing indicates that the connection is closing.
70 errConnClosing = errors.New("grpc: the connection is closing")
71 // errBalancerClosed indicates that the balancer is closed.
72 errBalancerClosed = errors.New("grpc: balancer is closed")
73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74 // service config.
75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80 // errNoTransportSecurity indicates that there is no transport security
81 // being set for ClientConn. Users should either set one or explicitly
82 // call WithInsecure DialOption to disable security.
83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84 // errTransportCredsAndBundle indicates that creds bundle is used together
85 // with other individual Transport Credentials.
86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87 // errTransportCredentialsMissing indicates that users want to transmit security
88 // information (e.g., OAuth2 token) which requires secure connection on an insecure
89 // connection.
90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
92 // and grpc.WithInsecure() are both called for a connection.
93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98 defaultClientMaxSendMessageSize = math.MaxInt32
99 // http2IOBufSize specifies the buffer size for sending frames.
100 defaultWriteBufSize = 32 * 1024
101 defaultReadBufSize = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106 return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126 cc := &ClientConn{
127 target: target,
128 csMgr: &connectivityStateManager{},
129 conns: make(map[*addrConn]struct{}),
130 dopts: defaultDialOptions(),
131 blockingpicker: newPickerWrapper(),
132 czData: new(channelzData),
133 firstResolveEvent: grpcsync.NewEvent(),
134 }
135 cc.retryThrottler.Store((*retryThrottler)(nil))
136 cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138 for _, opt := range opts {
139 opt.apply(&cc.dopts)
140 }
141
142 chainUnaryClientInterceptors(cc)
143 chainStreamClientInterceptors(cc)
144
145 defer func() {
146 if err != nil {
147 cc.Close()
148 }
149 }()
150
151 if channelz.IsOn() {
152 if cc.dopts.channelzParentID != 0 {
153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
155 Desc: "Channel Created",
156 Severity: channelz.CtINFO,
157 Parent: &channelz.TraceEventDesc{
158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159 Severity: channelz.CtINFO,
160 },
161 })
162 } else {
163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
165 Desc: "Channel Created",
166 Severity: channelz.CtINFO,
167 })
168 }
169 cc.csMgr.channelzID = cc.channelzID
170 }
171
172 if !cc.dopts.insecure {
173 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
174 return nil, errNoTransportSecurity
175 }
176 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
177 return nil, errTransportCredsAndBundle
178 }
179 } else {
180 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
181 return nil, errCredentialsConflict
182 }
183 for _, cd := range cc.dopts.copts.PerRPCCredentials {
184 if cd.RequireTransportSecurity() {
185 return nil, errTransportCredentialsMissing
186 }
187 }
188 }
189
190 if cc.dopts.defaultServiceConfigRawJSON != nil {
191 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
192 if scpr.Err != nil {
193 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
194 }
195 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
196 }
197 cc.mkp = cc.dopts.copts.KeepaliveParams
198
199 if cc.dopts.copts.Dialer == nil {
200 cc.dopts.copts.Dialer = newProxyDialer(
201 func(ctx context.Context, addr string) (net.Conn, error) {
202 network, addr := parseDialTarget(addr)
203 return (&net.Dialer{}).DialContext(ctx, network, addr)
204 },
205 )
206 }
207
208 if cc.dopts.copts.UserAgent != "" {
209 cc.dopts.copts.UserAgent += " " + grpcUA
210 } else {
211 cc.dopts.copts.UserAgent = grpcUA
212 }
213
214 if cc.dopts.timeout > 0 {
215 var cancel context.CancelFunc
216 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
217 defer cancel()
218 }
219 defer func() {
220 select {
221 case <-ctx.Done():
222 conn, err = nil, ctx.Err()
223 default:
224 }
225 }()
226
227 scSet := false
228 if cc.dopts.scChan != nil {
229 // Try to get an initial service config.
230 select {
231 case sc, ok := <-cc.dopts.scChan:
232 if ok {
233 cc.sc = &sc
234 scSet = true
235 }
236 default:
237 }
238 }
239 if cc.dopts.bs == nil {
240 cc.dopts.bs = backoff.DefaultExponential
241 }
242 if cc.dopts.resolverBuilder == nil {
243 // Only try to parse target when resolver builder is not already set.
244 cc.parsedTarget = parseTarget(cc.target)
245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
247 if cc.dopts.resolverBuilder == nil {
248 // If resolver builder is still nil, the parsed target's scheme is
249 // not registered. Fallback to default resolver and set Endpoint to
250 // the original target.
251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252 cc.parsedTarget = resolver.Target{
253 Scheme: resolver.GetDefaultScheme(),
254 Endpoint: target,
255 }
256 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
257 }
258 } else {
259 cc.parsedTarget = resolver.Target{Endpoint: target}
260 }
261 creds := cc.dopts.copts.TransportCredentials
262 if creds != nil && creds.Info().ServerName != "" {
263 cc.authority = creds.Info().ServerName
264 } else if cc.dopts.insecure && cc.dopts.authority != "" {
265 cc.authority = cc.dopts.authority
266 } else {
267 // Use endpoint from "scheme://authority/endpoint" as the default
268 // authority for ClientConn.
269 cc.authority = cc.parsedTarget.Endpoint
270 }
271
272 if cc.dopts.scChan != nil && !scSet {
273 // Blocking wait for the initial service config.
274 select {
275 case sc, ok := <-cc.dopts.scChan:
276 if ok {
277 cc.sc = &sc
278 }
279 case <-ctx.Done():
280 return nil, ctx.Err()
281 }
282 }
283 if cc.dopts.scChan != nil {
284 go cc.scWatcher()
285 }
286
287 var credsClone credentials.TransportCredentials
288 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
289 credsClone = creds.Clone()
290 }
291 cc.balancerBuildOpts = balancer.BuildOptions{
292 DialCreds: credsClone,
293 CredsBundle: cc.dopts.copts.CredsBundle,
294 Dialer: cc.dopts.copts.Dialer,
295 ChannelzParentID: cc.channelzID,
296 Target: cc.parsedTarget,
297 }
298
299 // Build the resolver.
300 rWrapper, err := newCCResolverWrapper(cc)
301 if err != nil {
302 return nil, fmt.Errorf("failed to build resolver: %v", err)
303 }
304
305 cc.mu.Lock()
306 cc.resolverWrapper = rWrapper
307 cc.mu.Unlock()
308 // A blocking dial blocks until the clientConn is ready.
309 if cc.dopts.block {
310 for {
311 s := cc.GetState()
312 if s == connectivity.Ready {
313 break
314 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
315 if err = cc.blockingpicker.connectionError(); err != nil {
316 terr, ok := err.(interface {
317 Temporary() bool
318 })
319 if ok && !terr.Temporary() {
320 return nil, err
321 }
322 }
323 }
324 if !cc.WaitForStateChange(ctx, s) {
325 // ctx got timeout or canceled.
326 return nil, ctx.Err()
327 }
328 }
329 }
330
331 return cc, nil
332}
333
334// chainUnaryClientInterceptors chains all unary client interceptors into one.
335func chainUnaryClientInterceptors(cc *ClientConn) {
336 interceptors := cc.dopts.chainUnaryInts
337 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
338 // be executed before any other chained interceptors.
339 if cc.dopts.unaryInt != nil {
340 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
341 }
342 var chainedInt UnaryClientInterceptor
343 if len(interceptors) == 0 {
344 chainedInt = nil
345 } else if len(interceptors) == 1 {
346 chainedInt = interceptors[0]
347 } else {
348 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
349 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
350 }
351 }
352 cc.dopts.unaryInt = chainedInt
353}
354
355// getChainUnaryInvoker recursively generate the chained unary invoker.
356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
357 if curr == len(interceptors)-1 {
358 return finalInvoker
359 }
360 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
361 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
362 }
363}
364
365// chainStreamClientInterceptors chains all stream client interceptors into one.
366func chainStreamClientInterceptors(cc *ClientConn) {
367 interceptors := cc.dopts.chainStreamInts
368 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
369 // be executed before any other chained interceptors.
370 if cc.dopts.streamInt != nil {
371 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
372 }
373 var chainedInt StreamClientInterceptor
374 if len(interceptors) == 0 {
375 chainedInt = nil
376 } else if len(interceptors) == 1 {
377 chainedInt = interceptors[0]
378 } else {
379 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
380 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
381 }
382 }
383 cc.dopts.streamInt = chainedInt
384}
385
386// getChainStreamer recursively generate the chained client stream constructor.
387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
388 if curr == len(interceptors)-1 {
389 return finalStreamer
390 }
391 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
392 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
393 }
394}
395
396// connectivityStateManager keeps the connectivity.State of ClientConn.
397// This struct will eventually be exported so the balancers can access it.
398type connectivityStateManager struct {
399 mu sync.Mutex
400 state connectivity.State
401 notifyChan chan struct{}
402 channelzID int64
403}
404
405// updateState updates the connectivity.State of ClientConn.
406// If there's a change it notifies goroutines waiting on state change to
407// happen.
408func (csm *connectivityStateManager) updateState(state connectivity.State) {
409 csm.mu.Lock()
410 defer csm.mu.Unlock()
411 if csm.state == connectivity.Shutdown {
412 return
413 }
414 if csm.state == state {
415 return
416 }
417 csm.state = state
418 if channelz.IsOn() {
419 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
420 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
421 Severity: channelz.CtINFO,
422 })
423 }
424 if csm.notifyChan != nil {
425 // There are other goroutines waiting on this channel.
426 close(csm.notifyChan)
427 csm.notifyChan = nil
428 }
429}
430
431func (csm *connectivityStateManager) getState() connectivity.State {
432 csm.mu.Lock()
433 defer csm.mu.Unlock()
434 return csm.state
435}
436
437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
438 csm.mu.Lock()
439 defer csm.mu.Unlock()
440 if csm.notifyChan == nil {
441 csm.notifyChan = make(chan struct{})
442 }
443 return csm.notifyChan
444}
445
446// ClientConn represents a virtual connection to a conceptual endpoint, to
447// perform RPCs.
448//
449// A ClientConn is free to have zero or more actual connections to the endpoint
450// based on configuration, load, etc. It is also free to determine which actual
451// endpoints to use and may change it every RPC, permitting client-side load
452// balancing.
453//
454// A ClientConn encapsulates a range of functionality including name
455// resolution, TCP connection establishment (with retries and backoff) and TLS
456// handshakes. It also handles errors on established connections by
457// re-resolving the name and reconnecting.
458type ClientConn struct {
459 ctx context.Context
460 cancel context.CancelFunc
461
462 target string
463 parsedTarget resolver.Target
464 authority string
465 dopts dialOptions
466 csMgr *connectivityStateManager
467
468 balancerBuildOpts balancer.BuildOptions
469 blockingpicker *pickerWrapper
470
471 mu sync.RWMutex
472 resolverWrapper *ccResolverWrapper
473 sc *ServiceConfig
474 conns map[*addrConn]struct{}
475 // Keepalive parameter can be updated if a GoAway is received.
476 mkp keepalive.ClientParameters
477 curBalancerName string
478 balancerWrapper *ccBalancerWrapper
479 retryThrottler atomic.Value
480
481 firstResolveEvent *grpcsync.Event
482
483 channelzID int64 // channelz unique identification number
484 czData *channelzData
485}
486
487// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
488// ctx expires. A true value is returned in former case and false in latter.
489// This is an EXPERIMENTAL API.
490func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
491 ch := cc.csMgr.getNotifyChan()
492 if cc.csMgr.getState() != sourceState {
493 return true
494 }
495 select {
496 case <-ctx.Done():
497 return false
498 case <-ch:
499 return true
500 }
501}
502
503// GetState returns the connectivity.State of ClientConn.
504// This is an EXPERIMENTAL API.
505func (cc *ClientConn) GetState() connectivity.State {
506 return cc.csMgr.getState()
507}
508
509func (cc *ClientConn) scWatcher() {
510 for {
511 select {
512 case sc, ok := <-cc.dopts.scChan:
513 if !ok {
514 return
515 }
516 cc.mu.Lock()
517 // TODO: load balance policy runtime change is ignored.
518 // We may revisit this decision in the future.
519 cc.sc = &sc
520 cc.mu.Unlock()
521 case <-cc.ctx.Done():
522 return
523 }
524 }
525}
526
527// waitForResolvedAddrs blocks until the resolver has provided addresses or the
528// context expires. Returns nil unless the context expires first; otherwise
529// returns a status error based on the context.
530func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
531 // This is on the RPC path, so we use a fast path to avoid the
532 // more-expensive "select" below after the resolver has returned once.
533 if cc.firstResolveEvent.HasFired() {
534 return nil
535 }
536 select {
537 case <-cc.firstResolveEvent.Done():
538 return nil
539 case <-ctx.Done():
540 return status.FromContextError(ctx.Err()).Err()
541 case <-cc.ctx.Done():
542 return ErrClientConnClosing
543 }
544}
545
546var emptyServiceConfig *ServiceConfig
547
548func init() {
549 cfg := parseServiceConfig("{}")
550 if cfg.Err != nil {
551 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
552 }
553 emptyServiceConfig = cfg.Config.(*ServiceConfig)
554}
555
556func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
557 if cc.sc != nil {
558 cc.applyServiceConfigAndBalancer(cc.sc, addrs)
559 return
560 }
561 if cc.dopts.defaultServiceConfig != nil {
562 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
563 } else {
564 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
565 }
566}
567
568func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
569 defer cc.firstResolveEvent.Fire()
570 cc.mu.Lock()
571 // Check if the ClientConn is already closed. Some fields (e.g.
572 // balancerWrapper) are set to nil when closing the ClientConn, and could
573 // cause nil pointer panic if we don't have this check.
574 if cc.conns == nil {
575 cc.mu.Unlock()
576 return nil
577 }
578
579 if err != nil {
580 // May need to apply the initial service config in case the resolver
581 // doesn't support service configs, or doesn't provide a service config
582 // with the new addresses.
583 cc.maybeApplyDefaultServiceConfig(nil)
584
585 if cc.balancerWrapper != nil {
586 cc.balancerWrapper.resolverError(err)
587 }
588
589 // No addresses are valid with err set; return early.
590 cc.mu.Unlock()
591 return balancer.ErrBadResolverState
592 }
593
594 var ret error
595 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
596 cc.maybeApplyDefaultServiceConfig(s.Addresses)
597 // TODO: do we need to apply a failing LB policy if there is no
598 // default, per the error handling design?
599 } else {
600 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
601 cc.applyServiceConfigAndBalancer(sc, s.Addresses)
602 } else {
603 ret = balancer.ErrBadResolverState
604 if cc.balancerWrapper == nil {
605 var err error
606 if s.ServiceConfig.Err != nil {
607 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
608 } else {
609 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
610 }
611 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
612 cc.csMgr.updateState(connectivity.TransientFailure)
613 cc.mu.Unlock()
614 return ret
615 }
616 }
617 }
618
619 var balCfg serviceconfig.LoadBalancingConfig
620 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
621 balCfg = cc.sc.lbConfig.cfg
622 }
623
624 cbn := cc.curBalancerName
625 bw := cc.balancerWrapper
626 cc.mu.Unlock()
627 if cbn != grpclbName {
628 // Filter any grpclb addresses since we don't have the grpclb balancer.
629 for i := 0; i < len(s.Addresses); {
630 if s.Addresses[i].Type == resolver.GRPCLB {
631 copy(s.Addresses[i:], s.Addresses[i+1:])
632 s.Addresses = s.Addresses[:len(s.Addresses)-1]
633 continue
634 }
635 i++
636 }
637 }
638 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
639 if ret == nil {
640 ret = uccsErr // prefer ErrBadResolver state since any other error is
641 // currently meaningless to the caller.
642 }
643 return ret
644}
645
646// switchBalancer starts the switching from current balancer to the balancer
647// with the given name.
648//
649// It will NOT send the current address list to the new balancer. If needed,
650// caller of this function should send address list to the new balancer after
651// this function returns.
652//
653// Caller must hold cc.mu.
654func (cc *ClientConn) switchBalancer(name string) {
655 if strings.EqualFold(cc.curBalancerName, name) {
656 return
657 }
658
659 grpclog.Infof("ClientConn switching balancer to %q", name)
660 if cc.dopts.balancerBuilder != nil {
661 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
662 return
663 }
664 if cc.balancerWrapper != nil {
665 cc.balancerWrapper.close()
666 }
667
668 builder := balancer.Get(name)
669 if channelz.IsOn() {
670 if builder == nil {
671 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
672 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
673 Severity: channelz.CtWarning,
674 })
675 } else {
676 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
677 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
678 Severity: channelz.CtINFO,
679 })
680 }
681 }
682 if builder == nil {
683 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
684 builder = newPickfirstBuilder()
685 }
686
687 cc.curBalancerName = builder.Name()
688 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
689}
690
691func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
692 cc.mu.Lock()
693 if cc.conns == nil {
694 cc.mu.Unlock()
695 return
696 }
697 // TODO(bar switching) send updates to all balancer wrappers when balancer
698 // gracefully switching is supported.
699 cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
700 cc.mu.Unlock()
701}
702
703// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
704//
705// Caller needs to make sure len(addrs) > 0.
706func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
707 ac := &addrConn{
708 cc: cc,
709 addrs: addrs,
710 scopts: opts,
711 dopts: cc.dopts,
712 czData: new(channelzData),
713 resetBackoff: make(chan struct{}),
714 }
715 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
716 // Track ac in cc. This needs to be done before any getTransport(...) is called.
717 cc.mu.Lock()
718 if cc.conns == nil {
719 cc.mu.Unlock()
720 return nil, ErrClientConnClosing
721 }
722 if channelz.IsOn() {
723 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
724 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
725 Desc: "Subchannel Created",
726 Severity: channelz.CtINFO,
727 Parent: &channelz.TraceEventDesc{
728 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
729 Severity: channelz.CtINFO,
730 },
731 })
732 }
733 cc.conns[ac] = struct{}{}
734 cc.mu.Unlock()
735 return ac, nil
736}
737
738// removeAddrConn removes the addrConn in the subConn from clientConn.
739// It also tears down the ac with the given error.
740func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
741 cc.mu.Lock()
742 if cc.conns == nil {
743 cc.mu.Unlock()
744 return
745 }
746 delete(cc.conns, ac)
747 cc.mu.Unlock()
748 ac.tearDown(err)
749}
750
751func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
752 return &channelz.ChannelInternalMetric{
753 State: cc.GetState(),
754 Target: cc.target,
755 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
756 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
757 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
758 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
759 }
760}
761
762// Target returns the target string of the ClientConn.
763// This is an EXPERIMENTAL API.
764func (cc *ClientConn) Target() string {
765 return cc.target
766}
767
768func (cc *ClientConn) incrCallsStarted() {
769 atomic.AddInt64(&cc.czData.callsStarted, 1)
770 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
771}
772
773func (cc *ClientConn) incrCallsSucceeded() {
774 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
775}
776
777func (cc *ClientConn) incrCallsFailed() {
778 atomic.AddInt64(&cc.czData.callsFailed, 1)
779}
780
781// connect starts creating a transport.
782// It does nothing if the ac is not IDLE.
783// TODO(bar) Move this to the addrConn section.
784func (ac *addrConn) connect() error {
785 ac.mu.Lock()
786 if ac.state == connectivity.Shutdown {
787 ac.mu.Unlock()
788 return errConnClosing
789 }
790 if ac.state != connectivity.Idle {
791 ac.mu.Unlock()
792 return nil
793 }
794 // Update connectivity state within the lock to prevent subsequent or
795 // concurrent calls from resetting the transport more than once.
796 ac.updateConnectivityState(connectivity.Connecting, nil)
797 ac.mu.Unlock()
798
799 // Start a goroutine connecting to the server asynchronously.
800 go ac.resetTransport()
801 return nil
802}
803
804// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
805//
806// If ac is Connecting, it returns false. The caller should tear down the ac and
807// create a new one. Note that the backoff will be reset when this happens.
808//
809// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
810// addresses will be picked up by retry in the next iteration after backoff.
811//
812// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
813//
814// If ac is Ready, it checks whether current connected address of ac is in the
815// new addrs list.
816// - If true, it updates ac.addrs and returns true. The ac will keep using
817// the existing connection.
818// - If false, it does nothing and returns false.
819func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
820 ac.mu.Lock()
821 defer ac.mu.Unlock()
822 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
823 if ac.state == connectivity.Shutdown ||
824 ac.state == connectivity.TransientFailure ||
825 ac.state == connectivity.Idle {
826 ac.addrs = addrs
827 return true
828 }
829
830 if ac.state == connectivity.Connecting {
831 return false
832 }
833
834 // ac.state is Ready, try to find the connected address.
835 var curAddrFound bool
836 for _, a := range addrs {
837 if reflect.DeepEqual(ac.curAddr, a) {
838 curAddrFound = true
839 break
840 }
841 }
842 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
843 if curAddrFound {
844 ac.addrs = addrs
845 }
846
847 return curAddrFound
848}
849
850// GetMethodConfig gets the method config of the input method.
851// If there's an exact match for input method (i.e. /service/method), we return
852// the corresponding MethodConfig.
853// If there isn't an exact match for the input method, we look for the default config
854// under the service (i.e /service/). If there is a default MethodConfig for
855// the service, we return it.
856// Otherwise, we return an empty MethodConfig.
857func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
858 // TODO: Avoid the locking here.
859 cc.mu.RLock()
860 defer cc.mu.RUnlock()
861 if cc.sc == nil {
862 return MethodConfig{}
863 }
864 m, ok := cc.sc.Methods[method]
865 if !ok {
866 i := strings.LastIndex(method, "/")
867 m = cc.sc.Methods[method[:i+1]]
868 }
869 return m
870}
871
872func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
873 cc.mu.RLock()
874 defer cc.mu.RUnlock()
875 if cc.sc == nil {
876 return nil
877 }
878 return cc.sc.healthCheckConfig
879}
880
881func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
882 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
883 Ctx: ctx,
884 FullMethodName: method,
885 })
886 if err != nil {
887 return nil, nil, toRPCErr(err)
888 }
889 return t, done, nil
890}
891
892func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
893 if sc == nil {
894 // should never reach here.
895 return
896 }
897 cc.sc = sc
898
899 if cc.sc.retryThrottling != nil {
900 newThrottler := &retryThrottler{
901 tokens: cc.sc.retryThrottling.MaxTokens,
902 max: cc.sc.retryThrottling.MaxTokens,
903 thresh: cc.sc.retryThrottling.MaxTokens / 2,
904 ratio: cc.sc.retryThrottling.TokenRatio,
905 }
906 cc.retryThrottler.Store(newThrottler)
907 } else {
908 cc.retryThrottler.Store((*retryThrottler)(nil))
909 }
910
911 if cc.dopts.balancerBuilder == nil {
912 // Only look at balancer types and switch balancer if balancer dial
913 // option is not set.
914 var newBalancerName string
915 if cc.sc != nil && cc.sc.lbConfig != nil {
916 newBalancerName = cc.sc.lbConfig.name
917 } else {
918 var isGRPCLB bool
919 for _, a := range addrs {
920 if a.Type == resolver.GRPCLB {
921 isGRPCLB = true
922 break
923 }
924 }
925 if isGRPCLB {
926 newBalancerName = grpclbName
927 } else if cc.sc != nil && cc.sc.LB != nil {
928 newBalancerName = *cc.sc.LB
929 } else {
930 newBalancerName = PickFirstBalancerName
931 }
932 }
933 cc.switchBalancer(newBalancerName)
934 } else if cc.balancerWrapper == nil {
935 // Balancer dial option was set, and this is the first time handling
936 // resolved addresses. Build a balancer with dopts.balancerBuilder.
937 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
938 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
939 }
940}
941
942func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
943 cc.mu.RLock()
944 r := cc.resolverWrapper
945 cc.mu.RUnlock()
946 if r == nil {
947 return
948 }
949 go r.resolveNow(o)
950}
951
952// ResetConnectBackoff wakes up all subchannels in transient failure and causes
953// them to attempt another connection immediately. It also resets the backoff
954// times used for subsequent attempts regardless of the current state.
955//
956// In general, this function should not be used. Typical service or network
957// outages result in a reasonable client reconnection strategy by default.
958// However, if a previously unavailable network becomes available, this may be
959// used to trigger an immediate reconnect.
960//
961// This API is EXPERIMENTAL.
962func (cc *ClientConn) ResetConnectBackoff() {
963 cc.mu.Lock()
964 conns := cc.conns
965 cc.mu.Unlock()
966 for ac := range conns {
967 ac.resetConnectBackoff()
968 }
969}
970
971// Close tears down the ClientConn and all underlying connections.
972func (cc *ClientConn) Close() error {
973 defer cc.cancel()
974
975 cc.mu.Lock()
976 if cc.conns == nil {
977 cc.mu.Unlock()
978 return ErrClientConnClosing
979 }
980 conns := cc.conns
981 cc.conns = nil
982 cc.csMgr.updateState(connectivity.Shutdown)
983
984 rWrapper := cc.resolverWrapper
985 cc.resolverWrapper = nil
986 bWrapper := cc.balancerWrapper
987 cc.balancerWrapper = nil
988 cc.mu.Unlock()
989
990 cc.blockingpicker.close()
991
992 if rWrapper != nil {
993 rWrapper.close()
994 }
995 if bWrapper != nil {
996 bWrapper.close()
997 }
998
999 for ac := range conns {
1000 ac.tearDown(ErrClientConnClosing)
1001 }
1002 if channelz.IsOn() {
1003 ted := &channelz.TraceEventDesc{
1004 Desc: "Channel Deleted",
1005 Severity: channelz.CtINFO,
1006 }
1007 if cc.dopts.channelzParentID != 0 {
1008 ted.Parent = &channelz.TraceEventDesc{
1009 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1010 Severity: channelz.CtINFO,
1011 }
1012 }
1013 channelz.AddTraceEvent(cc.channelzID, ted)
1014 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1015 // the entity being deleted, and thus prevent it from being deleted right away.
1016 channelz.RemoveEntry(cc.channelzID)
1017 }
1018 return nil
1019}
1020
1021// addrConn is a network connection to a given address.
1022type addrConn struct {
1023 ctx context.Context
1024 cancel context.CancelFunc
1025
1026 cc *ClientConn
1027 dopts dialOptions
1028 acbw balancer.SubConn
1029 scopts balancer.NewSubConnOptions
1030
1031 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1032 // health checking may require server to report healthy to set ac to READY), and is reset
1033 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1034 // is received, transport is closed, ac has been torn down).
1035 transport transport.ClientTransport // The current transport.
1036
1037 mu sync.Mutex
1038 curAddr resolver.Address // The current address.
1039 addrs []resolver.Address // All addresses that the resolver resolved to.
1040
1041 // Use updateConnectivityState for updating addrConn's connectivity state.
1042 state connectivity.State
1043
1044 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1045 resetBackoff chan struct{}
1046
1047 channelzID int64 // channelz unique identification number.
1048 czData *channelzData
1049}
1050
1051// Note: this requires a lock on ac.mu.
1052func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1053 if ac.state == s {
1054 return
1055 }
1056
1057 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
1058 ac.state = s
1059 if channelz.IsOn() {
1060 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1061 Desc: updateMsg,
1062 Severity: channelz.CtINFO,
1063 })
1064 }
1065 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1066}
1067
1068// adjustParams updates parameters used to create transports upon
1069// receiving a GoAway.
1070func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1071 switch r {
1072 case transport.GoAwayTooManyPings:
1073 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1074 ac.cc.mu.Lock()
1075 if v > ac.cc.mkp.Time {
1076 ac.cc.mkp.Time = v
1077 }
1078 ac.cc.mu.Unlock()
1079 }
1080}
1081
1082func (ac *addrConn) resetTransport() {
1083 for i := 0; ; i++ {
1084 if i > 0 {
1085 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1086 }
1087
1088 ac.mu.Lock()
1089 if ac.state == connectivity.Shutdown {
1090 ac.mu.Unlock()
1091 return
1092 }
1093
1094 addrs := ac.addrs
1095 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1096 // This will be the duration that dial gets to finish.
1097 dialDuration := minConnectTimeout
1098 if ac.dopts.minConnectTimeout != nil {
1099 dialDuration = ac.dopts.minConnectTimeout()
1100 }
1101
1102 if dialDuration < backoffFor {
1103 // Give dial more time as we keep failing to connect.
1104 dialDuration = backoffFor
1105 }
1106 // We can potentially spend all the time trying the first address, and
1107 // if the server accepts the connection and then hangs, the following
1108 // addresses will never be tried.
1109 //
1110 // The spec doesn't mention what should be done for multiple addresses.
1111 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1112 connectDeadline := time.Now().Add(dialDuration)
1113
1114 ac.updateConnectivityState(connectivity.Connecting, nil)
1115 ac.transport = nil
1116 ac.mu.Unlock()
1117
1118 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1119 if err != nil {
1120 // After exhausting all addresses, the addrConn enters
1121 // TRANSIENT_FAILURE.
1122 ac.mu.Lock()
1123 if ac.state == connectivity.Shutdown {
1124 ac.mu.Unlock()
1125 return
1126 }
1127 ac.updateConnectivityState(connectivity.TransientFailure, err)
1128
1129 // Backoff.
1130 b := ac.resetBackoff
1131 ac.mu.Unlock()
1132
1133 timer := time.NewTimer(backoffFor)
1134 select {
1135 case <-timer.C:
1136 ac.mu.Lock()
1137 ac.backoffIdx++
1138 ac.mu.Unlock()
1139 case <-b:
1140 timer.Stop()
1141 case <-ac.ctx.Done():
1142 timer.Stop()
1143 return
1144 }
1145 continue
1146 }
1147
1148 ac.mu.Lock()
1149 if ac.state == connectivity.Shutdown {
1150 ac.mu.Unlock()
1151 newTr.Close()
1152 return
1153 }
1154 ac.curAddr = addr
1155 ac.transport = newTr
1156 ac.backoffIdx = 0
1157
1158 hctx, hcancel := context.WithCancel(ac.ctx)
1159 ac.startHealthCheck(hctx)
1160 ac.mu.Unlock()
1161
1162 // Block until the created transport is down. And when this happens,
1163 // we restart from the top of the addr list.
1164 <-reconnect.Done()
1165 hcancel()
1166 // restart connecting - the top of the loop will set state to
1167 // CONNECTING. This is against the current connectivity semantics doc,
1168 // however it allows for graceful behavior for RPCs not yet dispatched
1169 // - unfortunate timing would otherwise lead to the RPC failing even
1170 // though the TRANSIENT_FAILURE state (called for by the doc) would be
1171 // instantaneous.
1172 //
1173 // Ideally we should transition to Idle here and block until there is
1174 // RPC activity that leads to the balancer requesting a reconnect of
1175 // the associated SubConn.
1176 }
1177}
1178
1179// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1180// first successful one. It returns the transport, the address and a Event in
1181// the successful case. The Event fires when the returned transport disconnects.
1182func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1183 var firstConnErr error
1184 for _, addr := range addrs {
1185 ac.mu.Lock()
1186 if ac.state == connectivity.Shutdown {
1187 ac.mu.Unlock()
1188 return nil, resolver.Address{}, nil, errConnClosing
1189 }
1190
1191 ac.cc.mu.RLock()
1192 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1193 ac.cc.mu.RUnlock()
1194
1195 copts := ac.dopts.copts
1196 if ac.scopts.CredsBundle != nil {
1197 copts.CredsBundle = ac.scopts.CredsBundle
1198 }
1199 ac.mu.Unlock()
1200
1201 if channelz.IsOn() {
1202 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1203 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1204 Severity: channelz.CtINFO,
1205 })
1206 }
1207
1208 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1209 if err == nil {
1210 return newTr, addr, reconnect, nil
1211 }
1212 if firstConnErr == nil {
1213 firstConnErr = err
1214 }
1215 ac.cc.blockingpicker.updateConnectionError(err)
1216 }
1217
1218 // Couldn't connect to any address.
1219 return nil, resolver.Address{}, nil, firstConnErr
1220}
1221
1222// createTransport creates a connection to addr. It returns the transport and a
1223// Event in the successful case. The Event fires when the returned transport
1224// disconnects.
1225func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1226 prefaceReceived := make(chan struct{})
1227 onCloseCalled := make(chan struct{})
1228 reconnect := grpcsync.NewEvent()
1229
1230 authority := ac.cc.authority
1231 // addr.ServerName takes precedent over ClientConn authority, if present.
1232 if addr.ServerName != "" {
1233 authority = addr.ServerName
1234 }
1235
1236 target := transport.TargetInfo{
1237 Addr: addr.Addr,
1238 Metadata: addr.Metadata,
1239 Authority: authority,
1240 }
1241
1242 once := sync.Once{}
1243 onGoAway := func(r transport.GoAwayReason) {
1244 ac.mu.Lock()
1245 ac.adjustParams(r)
1246 once.Do(func() {
1247 if ac.state == connectivity.Ready {
1248 // Prevent this SubConn from being used for new RPCs by setting its
1249 // state to Connecting.
1250 //
1251 // TODO: this should be Idle when grpc-go properly supports it.
1252 ac.updateConnectivityState(connectivity.Connecting, nil)
1253 }
1254 })
1255 ac.mu.Unlock()
1256 reconnect.Fire()
1257 }
1258
1259 onClose := func() {
1260 ac.mu.Lock()
1261 once.Do(func() {
1262 if ac.state == connectivity.Ready {
1263 // Prevent this SubConn from being used for new RPCs by setting its
1264 // state to Connecting.
1265 //
1266 // TODO: this should be Idle when grpc-go properly supports it.
1267 ac.updateConnectivityState(connectivity.Connecting, nil)
1268 }
1269 })
1270 ac.mu.Unlock()
1271 close(onCloseCalled)
1272 reconnect.Fire()
1273 }
1274
1275 onPrefaceReceipt := func() {
1276 close(prefaceReceived)
1277 }
1278
1279 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1280 defer cancel()
1281 if channelz.IsOn() {
1282 copts.ChannelzParentID = ac.channelzID
1283 }
1284
1285 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1286 if err != nil {
1287 // newTr is either nil, or closed.
1288 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1289 return nil, nil, err
1290 }
1291
1292 select {
1293 case <-time.After(time.Until(connectDeadline)):
1294 // We didn't get the preface in time.
1295 newTr.Close()
1296 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1297 return nil, nil, errors.New("timed out waiting for server handshake")
1298 case <-prefaceReceived:
1299 // We got the preface - huzzah! things are good.
1300 case <-onCloseCalled:
1301 // The transport has already closed - noop.
1302 return nil, nil, errors.New("connection closed")
1303 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1304 }
1305 return newTr, reconnect, nil
1306}
1307
1308// startHealthCheck starts the health checking stream (RPC) to watch the health
1309// stats of this connection if health checking is requested and configured.
1310//
1311// LB channel health checking is enabled when all requirements below are met:
1312// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1313// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1314// 3. a service config with non-empty healthCheckConfig field is provided
1315// 4. the load balancer requests it
1316//
1317// It sets addrConn to READY if the health checking stream is not started.
1318//
1319// Caller must hold ac.mu.
1320func (ac *addrConn) startHealthCheck(ctx context.Context) {
1321 var healthcheckManagingState bool
1322 defer func() {
1323 if !healthcheckManagingState {
1324 ac.updateConnectivityState(connectivity.Ready, nil)
1325 }
1326 }()
1327
1328 if ac.cc.dopts.disableHealthCheck {
1329 return
1330 }
1331 healthCheckConfig := ac.cc.healthCheckConfig()
1332 if healthCheckConfig == nil {
1333 return
1334 }
1335 if !ac.scopts.HealthCheckEnabled {
1336 return
1337 }
1338 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1339 if healthCheckFunc == nil {
1340 // The health package is not imported to set health check function.
1341 //
1342 // TODO: add a link to the health check doc in the error message.
1343 grpclog.Error("Health check is requested but health check function is not set.")
1344 return
1345 }
1346
1347 healthcheckManagingState = true
1348
1349 // Set up the health check helper functions.
1350 currentTr := ac.transport
1351 newStream := func(method string) (interface{}, error) {
1352 ac.mu.Lock()
1353 if ac.transport != currentTr {
1354 ac.mu.Unlock()
1355 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1356 }
1357 ac.mu.Unlock()
1358 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1359 }
1360 setConnectivityState := func(s connectivity.State, lastErr error) {
1361 ac.mu.Lock()
1362 defer ac.mu.Unlock()
1363 if ac.transport != currentTr {
1364 return
1365 }
1366 ac.updateConnectivityState(s, lastErr)
1367 }
1368 // Start the health checking stream.
1369 go func() {
1370 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1371 if err != nil {
1372 if status.Code(err) == codes.Unimplemented {
1373 if channelz.IsOn() {
1374 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1375 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1376 Severity: channelz.CtError,
1377 })
1378 }
1379 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1380 } else {
1381 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1382 }
1383 }
1384 }()
1385}
1386
1387func (ac *addrConn) resetConnectBackoff() {
1388 ac.mu.Lock()
1389 close(ac.resetBackoff)
1390 ac.backoffIdx = 0
1391 ac.resetBackoff = make(chan struct{})
1392 ac.mu.Unlock()
1393}
1394
1395// getReadyTransport returns the transport if ac's state is READY.
1396// Otherwise it returns nil, false.
1397// If ac's state is IDLE, it will trigger ac to connect.
1398func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1399 ac.mu.Lock()
1400 if ac.state == connectivity.Ready && ac.transport != nil {
1401 t := ac.transport
1402 ac.mu.Unlock()
1403 return t, true
1404 }
1405 var idle bool
1406 if ac.state == connectivity.Idle {
1407 idle = true
1408 }
1409 ac.mu.Unlock()
1410 // Trigger idle ac to connect.
1411 if idle {
1412 ac.connect()
1413 }
1414 return nil, false
1415}
1416
1417// tearDown starts to tear down the addrConn.
1418// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1419// some edge cases (e.g., the caller opens and closes many addrConn's in a
1420// tight loop.
1421// tearDown doesn't remove ac from ac.cc.conns.
1422func (ac *addrConn) tearDown(err error) {
1423 ac.mu.Lock()
1424 if ac.state == connectivity.Shutdown {
1425 ac.mu.Unlock()
1426 return
1427 }
1428 curTr := ac.transport
1429 ac.transport = nil
1430 // We have to set the state to Shutdown before anything else to prevent races
1431 // between setting the state and logic that waits on context cancellation / etc.
1432 ac.updateConnectivityState(connectivity.Shutdown, nil)
1433 ac.cancel()
1434 ac.curAddr = resolver.Address{}
1435 if err == errConnDrain && curTr != nil {
1436 // GracefulClose(...) may be executed multiple times when
1437 // i) receiving multiple GoAway frames from the server; or
1438 // ii) there are concurrent name resolver/Balancer triggered
1439 // address removal and GoAway.
1440 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1441 ac.mu.Unlock()
1442 curTr.GracefulClose()
1443 ac.mu.Lock()
1444 }
1445 if channelz.IsOn() {
1446 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1447 Desc: "Subchannel Deleted",
1448 Severity: channelz.CtINFO,
1449 Parent: &channelz.TraceEventDesc{
1450 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1451 Severity: channelz.CtINFO,
1452 },
1453 })
1454 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1455 // the entity being deleted, and thus prevent it from being deleted right away.
1456 channelz.RemoveEntry(ac.channelzID)
1457 }
1458 ac.mu.Unlock()
1459}
1460
1461func (ac *addrConn) getState() connectivity.State {
1462 ac.mu.Lock()
1463 defer ac.mu.Unlock()
1464 return ac.state
1465}
1466
1467func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1468 ac.mu.Lock()
1469 addr := ac.curAddr.Addr
1470 ac.mu.Unlock()
1471 return &channelz.ChannelInternalMetric{
1472 State: ac.getState(),
1473 Target: addr,
1474 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1475 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1476 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1477 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1478 }
1479}
1480
1481func (ac *addrConn) incrCallsStarted() {
1482 atomic.AddInt64(&ac.czData.callsStarted, 1)
1483 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1484}
1485
1486func (ac *addrConn) incrCallsSucceeded() {
1487 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1488}
1489
1490func (ac *addrConn) incrCallsFailed() {
1491 atomic.AddInt64(&ac.czData.callsFailed, 1)
1492}
1493
1494type retryThrottler struct {
1495 max float64
1496 thresh float64
1497 ratio float64
1498
1499 mu sync.Mutex
1500 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1501}
1502
1503// throttle subtracts a retry token from the pool and returns whether a retry
1504// should be throttled (disallowed) based upon the retry throttling policy in
1505// the service config.
1506func (rt *retryThrottler) throttle() bool {
1507 if rt == nil {
1508 return false
1509 }
1510 rt.mu.Lock()
1511 defer rt.mu.Unlock()
1512 rt.tokens--
1513 if rt.tokens < 0 {
1514 rt.tokens = 0
1515 }
1516 return rt.tokens <= rt.thresh
1517}
1518
1519func (rt *retryThrottler) successfulRPC() {
1520 if rt == nil {
1521 return
1522 }
1523 rt.mu.Lock()
1524 defer rt.mu.Unlock()
1525 rt.tokens += rt.ratio
1526 if rt.tokens > rt.max {
1527 rt.tokens = rt.max
1528 }
1529}
1530
1531type channelzChannel struct {
1532 cc *ClientConn
1533}
1534
1535func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1536 return c.cc.channelzMetric()
1537}
1538
1539// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1540// underlying connections within the specified timeout.
1541//
1542// Deprecated: This error is never returned by grpc and should not be
1543// referenced by users.
1544var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")