blob: 34cc4c948db0d1ebf2de906d7b1c02652315b3fe [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "reflect"
27 "strings"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/balancer/base"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/connectivity"
36 "google.golang.org/grpc/credentials"
37 "google.golang.org/grpc/internal/backoff"
38 "google.golang.org/grpc/internal/channelz"
39 "google.golang.org/grpc/internal/grpcsync"
40 "google.golang.org/grpc/internal/grpcutil"
41 iresolver "google.golang.org/grpc/internal/resolver"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/serviceconfig"
46 "google.golang.org/grpc/status"
47
48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
52)
53
54const (
55 // minimum time to give a connection to complete
56 minConnectTimeout = 20 * time.Second
57 // must match grpclbName in grpclb/grpclb.go
58 grpclbName = "grpclb"
59)
60
61var (
62 // ErrClientConnClosing indicates that the operation is illegal because
63 // the ClientConn is closing.
64 //
65 // Deprecated: this error should not be relied upon by users; use the status
66 // code of Canceled instead.
67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69 errConnDrain = errors.New("grpc: the connection is drained")
70 // errConnClosing indicates that the connection is closing.
71 errConnClosing = errors.New("grpc: the connection is closing")
72 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
73 // service config.
74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
75)
76
77// The following errors are returned from Dial and DialContext
78var (
79 // errNoTransportSecurity indicates that there is no transport security
80 // being set for ClientConn. Users should either set one or explicitly
81 // call WithInsecure DialOption to disable security.
82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
83 // errTransportCredsAndBundle indicates that creds bundle is used together
84 // with other individual Transport Credentials.
85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
86 // errTransportCredentialsMissing indicates that users want to transmit security
87 // information (e.g., OAuth2 token) which requires secure connection on an insecure
88 // connection.
89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
90 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
91 // and grpc.WithInsecure() are both called for a connection.
92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
93)
94
95const (
96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97 defaultClientMaxSendMessageSize = math.MaxInt32
98 // http2IOBufSize specifies the buffer size for sending frames.
99 defaultWriteBufSize = 32 * 1024
100 defaultReadBufSize = 32 * 1024
101)
102
103// Dial creates a client connection to the given target.
104func Dial(target string, opts ...DialOption) (*ClientConn, error) {
105 return DialContext(context.Background(), target, opts...)
106}
107
108type defaultConfigSelector struct {
109 sc *ServiceConfig
110}
111
112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
113 return &iresolver.RPCConfig{
114 Context: rpcInfo.Context,
115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
116 }, nil
117}
118
119// DialContext creates a client connection to the given target. By default, it's
120// a non-blocking dial (the function won't wait for connections to be
121// established, and connecting happens in the background). To make it a blocking
122// dial, use WithBlock() dial option.
123//
124// In the non-blocking case, the ctx does not act against the connection. It
125// only controls the setup steps.
126//
127// In the blocking case, ctx can be used to cancel or expire the pending
128// connection. Once this function returns, the cancellation and expiration of
129// ctx will be noop. Users should call ClientConn.Close to terminate all the
130// pending operations after this function returns.
131//
132// The target name syntax is defined in
133// https://github.com/grpc/grpc/blob/master/doc/naming.md.
134// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
135func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
136 cc := &ClientConn{
137 target: target,
138 csMgr: &connectivityStateManager{},
139 conns: make(map[*addrConn]struct{}),
140 dopts: defaultDialOptions(),
141 blockingpicker: newPickerWrapper(),
142 czData: new(channelzData),
143 firstResolveEvent: grpcsync.NewEvent(),
144 }
145 cc.retryThrottler.Store((*retryThrottler)(nil))
146 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
147 cc.ctx, cc.cancel = context.WithCancel(context.Background())
148
149 for _, opt := range opts {
150 opt.apply(&cc.dopts)
151 }
152
153 chainUnaryClientInterceptors(cc)
154 chainStreamClientInterceptors(cc)
155
156 defer func() {
157 if err != nil {
158 cc.Close()
159 }
160 }()
161
162 if channelz.IsOn() {
163 if cc.dopts.channelzParentID != 0 {
164 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
165 channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
166 Desc: "Channel Created",
167 Severity: channelz.CtInfo,
168 Parent: &channelz.TraceEventDesc{
169 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
170 Severity: channelz.CtInfo,
171 },
172 })
173 } else {
174 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
175 channelz.Info(logger, cc.channelzID, "Channel Created")
176 }
177 cc.csMgr.channelzID = cc.channelzID
178 }
179
180 if !cc.dopts.insecure {
181 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
182 return nil, errNoTransportSecurity
183 }
184 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
185 return nil, errTransportCredsAndBundle
186 }
187 } else {
188 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
189 return nil, errCredentialsConflict
190 }
191 for _, cd := range cc.dopts.copts.PerRPCCredentials {
192 if cd.RequireTransportSecurity() {
193 return nil, errTransportCredentialsMissing
194 }
195 }
196 }
197
198 if cc.dopts.defaultServiceConfigRawJSON != nil {
199 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
200 if scpr.Err != nil {
201 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
202 }
203 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
204 }
205 cc.mkp = cc.dopts.copts.KeepaliveParams
206
207 if cc.dopts.copts.UserAgent != "" {
208 cc.dopts.copts.UserAgent += " " + grpcUA
209 } else {
210 cc.dopts.copts.UserAgent = grpcUA
211 }
212
213 if cc.dopts.timeout > 0 {
214 var cancel context.CancelFunc
215 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
216 defer cancel()
217 }
218 defer func() {
219 select {
220 case <-ctx.Done():
221 switch {
222 case ctx.Err() == err:
223 conn = nil
224 case err == nil || !cc.dopts.returnLastError:
225 conn, err = nil, ctx.Err()
226 default:
227 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
228 }
229 default:
230 }
231 }()
232
233 scSet := false
234 if cc.dopts.scChan != nil {
235 // Try to get an initial service config.
236 select {
237 case sc, ok := <-cc.dopts.scChan:
238 if ok {
239 cc.sc = &sc
240 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
241 scSet = true
242 }
243 default:
244 }
245 }
246 if cc.dopts.bs == nil {
247 cc.dopts.bs = backoff.DefaultExponential
248 }
249
250 // Determine the resolver to use.
251 cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
252 channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
253 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
254 if resolverBuilder == nil {
255 // If resolver builder is still nil, the parsed target's scheme is
256 // not registered. Fallback to default resolver and set Endpoint to
257 // the original target.
258 channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
259 cc.parsedTarget = resolver.Target{
260 Scheme: resolver.GetDefaultScheme(),
261 Endpoint: target,
262 }
263 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
264 if resolverBuilder == nil {
265 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
266 }
267 }
268
269 creds := cc.dopts.copts.TransportCredentials
270 if creds != nil && creds.Info().ServerName != "" {
271 cc.authority = creds.Info().ServerName
272 } else if cc.dopts.insecure && cc.dopts.authority != "" {
273 cc.authority = cc.dopts.authority
274 } else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
275 cc.authority = "localhost"
276 } else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
277 cc.authority = "localhost" + cc.parsedTarget.Endpoint
278 } else {
279 // Use endpoint from "scheme://authority/endpoint" as the default
280 // authority for ClientConn.
281 cc.authority = cc.parsedTarget.Endpoint
282 }
283
284 if cc.dopts.scChan != nil && !scSet {
285 // Blocking wait for the initial service config.
286 select {
287 case sc, ok := <-cc.dopts.scChan:
288 if ok {
289 cc.sc = &sc
290 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
291 }
292 case <-ctx.Done():
293 return nil, ctx.Err()
294 }
295 }
296 if cc.dopts.scChan != nil {
297 go cc.scWatcher()
298 }
299
300 var credsClone credentials.TransportCredentials
301 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
302 credsClone = creds.Clone()
303 }
304 cc.balancerBuildOpts = balancer.BuildOptions{
305 DialCreds: credsClone,
306 CredsBundle: cc.dopts.copts.CredsBundle,
307 Dialer: cc.dopts.copts.Dialer,
308 CustomUserAgent: cc.dopts.copts.UserAgent,
309 ChannelzParentID: cc.channelzID,
310 Target: cc.parsedTarget,
311 }
312
313 // Build the resolver.
314 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
315 if err != nil {
316 return nil, fmt.Errorf("failed to build resolver: %v", err)
317 }
318 cc.mu.Lock()
319 cc.resolverWrapper = rWrapper
320 cc.mu.Unlock()
321
322 // A blocking dial blocks until the clientConn is ready.
323 if cc.dopts.block {
324 for {
325 cc.Connect()
326 s := cc.GetState()
327 if s == connectivity.Ready {
328 break
329 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
330 if err = cc.connectionError(); err != nil {
331 terr, ok := err.(interface {
332 Temporary() bool
333 })
334 if ok && !terr.Temporary() {
335 return nil, err
336 }
337 }
338 }
339 if !cc.WaitForStateChange(ctx, s) {
340 // ctx got timeout or canceled.
341 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
342 return nil, err
343 }
344 return nil, ctx.Err()
345 }
346 }
347 }
348
349 return cc, nil
350}
351
352// chainUnaryClientInterceptors chains all unary client interceptors into one.
353func chainUnaryClientInterceptors(cc *ClientConn) {
354 interceptors := cc.dopts.chainUnaryInts
355 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
356 // be executed before any other chained interceptors.
357 if cc.dopts.unaryInt != nil {
358 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
359 }
360 var chainedInt UnaryClientInterceptor
361 if len(interceptors) == 0 {
362 chainedInt = nil
363 } else if len(interceptors) == 1 {
364 chainedInt = interceptors[0]
365 } else {
366 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
367 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
368 }
369 }
370 cc.dopts.unaryInt = chainedInt
371}
372
373// getChainUnaryInvoker recursively generate the chained unary invoker.
374func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
375 if curr == len(interceptors)-1 {
376 return finalInvoker
377 }
378 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
379 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
380 }
381}
382
383// chainStreamClientInterceptors chains all stream client interceptors into one.
384func chainStreamClientInterceptors(cc *ClientConn) {
385 interceptors := cc.dopts.chainStreamInts
386 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
387 // be executed before any other chained interceptors.
388 if cc.dopts.streamInt != nil {
389 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
390 }
391 var chainedInt StreamClientInterceptor
392 if len(interceptors) == 0 {
393 chainedInt = nil
394 } else if len(interceptors) == 1 {
395 chainedInt = interceptors[0]
396 } else {
397 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
398 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
399 }
400 }
401 cc.dopts.streamInt = chainedInt
402}
403
404// getChainStreamer recursively generate the chained client stream constructor.
405func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
406 if curr == len(interceptors)-1 {
407 return finalStreamer
408 }
409 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
410 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
411 }
412}
413
414// connectivityStateManager keeps the connectivity.State of ClientConn.
415// This struct will eventually be exported so the balancers can access it.
416type connectivityStateManager struct {
417 mu sync.Mutex
418 state connectivity.State
419 notifyChan chan struct{}
420 channelzID int64
421}
422
423// updateState updates the connectivity.State of ClientConn.
424// If there's a change it notifies goroutines waiting on state change to
425// happen.
426func (csm *connectivityStateManager) updateState(state connectivity.State) {
427 csm.mu.Lock()
428 defer csm.mu.Unlock()
429 if csm.state == connectivity.Shutdown {
430 return
431 }
432 if csm.state == state {
433 return
434 }
435 csm.state = state
436 channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
437 if csm.notifyChan != nil {
438 // There are other goroutines waiting on this channel.
439 close(csm.notifyChan)
440 csm.notifyChan = nil
441 }
442}
443
444func (csm *connectivityStateManager) getState() connectivity.State {
445 csm.mu.Lock()
446 defer csm.mu.Unlock()
447 return csm.state
448}
449
450func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
451 csm.mu.Lock()
452 defer csm.mu.Unlock()
453 if csm.notifyChan == nil {
454 csm.notifyChan = make(chan struct{})
455 }
456 return csm.notifyChan
457}
458
459// ClientConnInterface defines the functions clients need to perform unary and
460// streaming RPCs. It is implemented by *ClientConn, and is only intended to
461// be referenced by generated code.
462type ClientConnInterface interface {
463 // Invoke performs a unary RPC and returns after the response is received
464 // into reply.
465 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
466 // NewStream begins a streaming RPC.
467 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
468}
469
470// Assert *ClientConn implements ClientConnInterface.
471var _ ClientConnInterface = (*ClientConn)(nil)
472
473// ClientConn represents a virtual connection to a conceptual endpoint, to
474// perform RPCs.
475//
476// A ClientConn is free to have zero or more actual connections to the endpoint
477// based on configuration, load, etc. It is also free to determine which actual
478// endpoints to use and may change it every RPC, permitting client-side load
479// balancing.
480//
481// A ClientConn encapsulates a range of functionality including name
482// resolution, TCP connection establishment (with retries and backoff) and TLS
483// handshakes. It also handles errors on established connections by
484// re-resolving the name and reconnecting.
485type ClientConn struct {
486 ctx context.Context
487 cancel context.CancelFunc
488
489 target string
490 parsedTarget resolver.Target
491 authority string
492 dopts dialOptions
493 csMgr *connectivityStateManager
494
495 balancerBuildOpts balancer.BuildOptions
496 blockingpicker *pickerWrapper
497
498 safeConfigSelector iresolver.SafeConfigSelector
499
500 mu sync.RWMutex
501 resolverWrapper *ccResolverWrapper
502 sc *ServiceConfig
503 conns map[*addrConn]struct{}
504 // Keepalive parameter can be updated if a GoAway is received.
505 mkp keepalive.ClientParameters
506 curBalancerName string
507 balancerWrapper *ccBalancerWrapper
508 retryThrottler atomic.Value
509
510 firstResolveEvent *grpcsync.Event
511
512 channelzID int64 // channelz unique identification number
513 czData *channelzData
514
515 lceMu sync.Mutex // protects lastConnectionError
516 lastConnectionError error
517}
518
519// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
520// ctx expires. A true value is returned in former case and false in latter.
521//
522// Experimental
523//
524// Notice: This API is EXPERIMENTAL and may be changed or removed in a
525// later release.
526func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
527 ch := cc.csMgr.getNotifyChan()
528 if cc.csMgr.getState() != sourceState {
529 return true
530 }
531 select {
532 case <-ctx.Done():
533 return false
534 case <-ch:
535 return true
536 }
537}
538
539// GetState returns the connectivity.State of ClientConn.
540//
541// Experimental
542//
543// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
544// release.
545func (cc *ClientConn) GetState() connectivity.State {
546 return cc.csMgr.getState()
547}
548
549// Connect causes all subchannels in the ClientConn to attempt to connect if
550// the channel is idle. Does not wait for the connection attempts to begin
551// before returning.
552//
553// Experimental
554//
555// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
556// release.
557func (cc *ClientConn) Connect() {
558 cc.mu.Lock()
559 defer cc.mu.Unlock()
560 if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
561 return
562 }
563 for ac := range cc.conns {
564 go ac.connect()
565 }
566}
567
568func (cc *ClientConn) scWatcher() {
569 for {
570 select {
571 case sc, ok := <-cc.dopts.scChan:
572 if !ok {
573 return
574 }
575 cc.mu.Lock()
576 // TODO: load balance policy runtime change is ignored.
577 // We may revisit this decision in the future.
578 cc.sc = &sc
579 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
580 cc.mu.Unlock()
581 case <-cc.ctx.Done():
582 return
583 }
584 }
585}
586
587// waitForResolvedAddrs blocks until the resolver has provided addresses or the
588// context expires. Returns nil unless the context expires first; otherwise
589// returns a status error based on the context.
590func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
591 // This is on the RPC path, so we use a fast path to avoid the
592 // more-expensive "select" below after the resolver has returned once.
593 if cc.firstResolveEvent.HasFired() {
594 return nil
595 }
596 select {
597 case <-cc.firstResolveEvent.Done():
598 return nil
599 case <-ctx.Done():
600 return status.FromContextError(ctx.Err()).Err()
601 case <-cc.ctx.Done():
602 return ErrClientConnClosing
603 }
604}
605
606var emptyServiceConfig *ServiceConfig
607
608func init() {
609 cfg := parseServiceConfig("{}")
610 if cfg.Err != nil {
611 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
612 }
613 emptyServiceConfig = cfg.Config.(*ServiceConfig)
614}
615
616func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
617 if cc.sc != nil {
618 cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
619 return
620 }
621 if cc.dopts.defaultServiceConfig != nil {
622 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
623 } else {
624 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
625 }
626}
627
628func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
629 defer cc.firstResolveEvent.Fire()
630 cc.mu.Lock()
631 // Check if the ClientConn is already closed. Some fields (e.g.
632 // balancerWrapper) are set to nil when closing the ClientConn, and could
633 // cause nil pointer panic if we don't have this check.
634 if cc.conns == nil {
635 cc.mu.Unlock()
636 return nil
637 }
638
639 if err != nil {
640 // May need to apply the initial service config in case the resolver
641 // doesn't support service configs, or doesn't provide a service config
642 // with the new addresses.
643 cc.maybeApplyDefaultServiceConfig(nil)
644
645 if cc.balancerWrapper != nil {
646 cc.balancerWrapper.resolverError(err)
647 }
648
649 // No addresses are valid with err set; return early.
650 cc.mu.Unlock()
651 return balancer.ErrBadResolverState
652 }
653
654 var ret error
655 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
656 cc.maybeApplyDefaultServiceConfig(s.Addresses)
657 // TODO: do we need to apply a failing LB policy if there is no
658 // default, per the error handling design?
659 } else {
660 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
661 configSelector := iresolver.GetConfigSelector(s)
662 if configSelector != nil {
663 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
664 channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
665 }
666 } else {
667 configSelector = &defaultConfigSelector{sc}
668 }
669 cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
670 } else {
671 ret = balancer.ErrBadResolverState
672 if cc.balancerWrapper == nil {
673 var err error
674 if s.ServiceConfig.Err != nil {
675 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
676 } else {
677 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
678 }
679 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
680 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
681 cc.csMgr.updateState(connectivity.TransientFailure)
682 cc.mu.Unlock()
683 return ret
684 }
685 }
686 }
687
688 var balCfg serviceconfig.LoadBalancingConfig
689 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
690 balCfg = cc.sc.lbConfig.cfg
691 }
692
693 cbn := cc.curBalancerName
694 bw := cc.balancerWrapper
695 cc.mu.Unlock()
696 if cbn != grpclbName {
697 // Filter any grpclb addresses since we don't have the grpclb balancer.
698 for i := 0; i < len(s.Addresses); {
699 if s.Addresses[i].Type == resolver.GRPCLB {
700 copy(s.Addresses[i:], s.Addresses[i+1:])
701 s.Addresses = s.Addresses[:len(s.Addresses)-1]
702 continue
703 }
704 i++
705 }
706 }
707 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
708 if ret == nil {
709 ret = uccsErr // prefer ErrBadResolver state since any other error is
710 // currently meaningless to the caller.
711 }
712 return ret
713}
714
715// switchBalancer starts the switching from current balancer to the balancer
716// with the given name.
717//
718// It will NOT send the current address list to the new balancer. If needed,
719// caller of this function should send address list to the new balancer after
720// this function returns.
721//
722// Caller must hold cc.mu.
723func (cc *ClientConn) switchBalancer(name string) {
724 if strings.EqualFold(cc.curBalancerName, name) {
725 return
726 }
727
728 channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
729 if cc.dopts.balancerBuilder != nil {
730 channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
731 return
732 }
733 if cc.balancerWrapper != nil {
734 // Don't hold cc.mu while closing the balancers. The balancers may call
735 // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
736 // would cause a deadlock in that case.
737 cc.mu.Unlock()
738 cc.balancerWrapper.close()
739 cc.mu.Lock()
740 }
741
742 builder := balancer.Get(name)
743 if builder == nil {
744 channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
745 channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
746 builder = newPickfirstBuilder()
747 } else {
748 channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
749 }
750
751 cc.curBalancerName = builder.Name()
752 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
753}
754
755func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
756 cc.mu.Lock()
757 if cc.conns == nil {
758 cc.mu.Unlock()
759 return
760 }
761 // TODO(bar switching) send updates to all balancer wrappers when balancer
762 // gracefully switching is supported.
763 cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
764 cc.mu.Unlock()
765}
766
767// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
768//
769// Caller needs to make sure len(addrs) > 0.
770func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
771 ac := &addrConn{
772 state: connectivity.Idle,
773 cc: cc,
774 addrs: addrs,
775 scopts: opts,
776 dopts: cc.dopts,
777 czData: new(channelzData),
778 resetBackoff: make(chan struct{}),
779 }
780 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
781 // Track ac in cc. This needs to be done before any getTransport(...) is called.
782 cc.mu.Lock()
783 if cc.conns == nil {
784 cc.mu.Unlock()
785 return nil, ErrClientConnClosing
786 }
787 if channelz.IsOn() {
788 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
789 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
790 Desc: "Subchannel Created",
791 Severity: channelz.CtInfo,
792 Parent: &channelz.TraceEventDesc{
793 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
794 Severity: channelz.CtInfo,
795 },
796 })
797 }
798 cc.conns[ac] = struct{}{}
799 cc.mu.Unlock()
800 return ac, nil
801}
802
803// removeAddrConn removes the addrConn in the subConn from clientConn.
804// It also tears down the ac with the given error.
805func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
806 cc.mu.Lock()
807 if cc.conns == nil {
808 cc.mu.Unlock()
809 return
810 }
811 delete(cc.conns, ac)
812 cc.mu.Unlock()
813 ac.tearDown(err)
814}
815
816func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
817 return &channelz.ChannelInternalMetric{
818 State: cc.GetState(),
819 Target: cc.target,
820 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
821 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
822 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
823 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
824 }
825}
826
827// Target returns the target string of the ClientConn.
828//
829// Experimental
830//
831// Notice: This API is EXPERIMENTAL and may be changed or removed in a
832// later release.
833func (cc *ClientConn) Target() string {
834 return cc.target
835}
836
837func (cc *ClientConn) incrCallsStarted() {
838 atomic.AddInt64(&cc.czData.callsStarted, 1)
839 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
840}
841
842func (cc *ClientConn) incrCallsSucceeded() {
843 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
844}
845
846func (cc *ClientConn) incrCallsFailed() {
847 atomic.AddInt64(&cc.czData.callsFailed, 1)
848}
849
850// connect starts creating a transport.
851// It does nothing if the ac is not IDLE.
852// TODO(bar) Move this to the addrConn section.
853func (ac *addrConn) connect() error {
854 ac.mu.Lock()
855 if ac.state == connectivity.Shutdown {
856 ac.mu.Unlock()
857 return errConnClosing
858 }
859 if ac.state != connectivity.Idle {
860 ac.mu.Unlock()
861 return nil
862 }
863 // Update connectivity state within the lock to prevent subsequent or
864 // concurrent calls from resetting the transport more than once.
865 ac.updateConnectivityState(connectivity.Connecting, nil)
866 ac.mu.Unlock()
867
868 ac.resetTransport()
869 return nil
870}
871
872// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
873//
874// If ac is Connecting, it returns false. The caller should tear down the ac and
875// create a new one. Note that the backoff will be reset when this happens.
876//
877// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
878// addresses will be picked up by retry in the next iteration after backoff.
879//
880// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
881//
882// If ac is Ready, it checks whether current connected address of ac is in the
883// new addrs list.
884// - If true, it updates ac.addrs and returns true. The ac will keep using
885// the existing connection.
886// - If false, it does nothing and returns false.
887func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
888 ac.mu.Lock()
889 defer ac.mu.Unlock()
890 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
891 if ac.state == connectivity.Shutdown ||
892 ac.state == connectivity.TransientFailure ||
893 ac.state == connectivity.Idle {
894 ac.addrs = addrs
895 return true
896 }
897
898 if ac.state == connectivity.Connecting {
899 return false
900 }
901
902 // ac.state is Ready, try to find the connected address.
903 var curAddrFound bool
904 for _, a := range addrs {
905 // a.ServerName takes precedent over ClientConn authority, if present.
906 if a.ServerName == "" {
907 a.ServerName = ac.cc.authority
908 }
909 if reflect.DeepEqual(ac.curAddr, a) {
910 curAddrFound = true
911 break
912 }
913 }
914 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
915 if curAddrFound {
916 ac.addrs = addrs
917 }
918
919 return curAddrFound
920}
921
922func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
923 if sc == nil {
924 return MethodConfig{}
925 }
926 if m, ok := sc.Methods[method]; ok {
927 return m
928 }
929 i := strings.LastIndex(method, "/")
930 if m, ok := sc.Methods[method[:i+1]]; ok {
931 return m
932 }
933 return sc.Methods[""]
934}
935
936// GetMethodConfig gets the method config of the input method.
937// If there's an exact match for input method (i.e. /service/method), we return
938// the corresponding MethodConfig.
939// If there isn't an exact match for the input method, we look for the service's default
940// config under the service (i.e /service/) and then for the default for all services (empty string).
941//
942// If there is a default MethodConfig for the service, we return it.
943// Otherwise, we return an empty MethodConfig.
944func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
945 // TODO: Avoid the locking here.
946 cc.mu.RLock()
947 defer cc.mu.RUnlock()
948 return getMethodConfig(cc.sc, method)
949}
950
951func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
952 cc.mu.RLock()
953 defer cc.mu.RUnlock()
954 if cc.sc == nil {
955 return nil
956 }
957 return cc.sc.healthCheckConfig
958}
959
960func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
961 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
962 Ctx: ctx,
963 FullMethodName: method,
964 })
965 if err != nil {
966 return nil, nil, toRPCErr(err)
967 }
968 return t, done, nil
969}
970
971func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
972 if sc == nil {
973 // should never reach here.
974 return
975 }
976 cc.sc = sc
977 if configSelector != nil {
978 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
979 }
980
981 if cc.sc.retryThrottling != nil {
982 newThrottler := &retryThrottler{
983 tokens: cc.sc.retryThrottling.MaxTokens,
984 max: cc.sc.retryThrottling.MaxTokens,
985 thresh: cc.sc.retryThrottling.MaxTokens / 2,
986 ratio: cc.sc.retryThrottling.TokenRatio,
987 }
988 cc.retryThrottler.Store(newThrottler)
989 } else {
990 cc.retryThrottler.Store((*retryThrottler)(nil))
991 }
992
993 if cc.dopts.balancerBuilder == nil {
994 // Only look at balancer types and switch balancer if balancer dial
995 // option is not set.
996 var newBalancerName string
997 if cc.sc != nil && cc.sc.lbConfig != nil {
998 newBalancerName = cc.sc.lbConfig.name
999 } else {
1000 var isGRPCLB bool
1001 for _, a := range addrs {
1002 if a.Type == resolver.GRPCLB {
1003 isGRPCLB = true
1004 break
1005 }
1006 }
1007 if isGRPCLB {
1008 newBalancerName = grpclbName
1009 } else if cc.sc != nil && cc.sc.LB != nil {
1010 newBalancerName = *cc.sc.LB
1011 } else {
1012 newBalancerName = PickFirstBalancerName
1013 }
1014 }
1015 cc.switchBalancer(newBalancerName)
1016 } else if cc.balancerWrapper == nil {
1017 // Balancer dial option was set, and this is the first time handling
1018 // resolved addresses. Build a balancer with dopts.balancerBuilder.
1019 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
1020 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
1021 }
1022}
1023
1024func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
1025 cc.mu.RLock()
1026 r := cc.resolverWrapper
1027 cc.mu.RUnlock()
1028 if r == nil {
1029 return
1030 }
1031 go r.resolveNow(o)
1032}
1033
1034// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1035// them to attempt another connection immediately. It also resets the backoff
1036// times used for subsequent attempts regardless of the current state.
1037//
1038// In general, this function should not be used. Typical service or network
1039// outages result in a reasonable client reconnection strategy by default.
1040// However, if a previously unavailable network becomes available, this may be
1041// used to trigger an immediate reconnect.
1042//
1043// Experimental
1044//
1045// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1046// later release.
1047func (cc *ClientConn) ResetConnectBackoff() {
1048 cc.mu.Lock()
1049 conns := cc.conns
1050 cc.mu.Unlock()
1051 for ac := range conns {
1052 ac.resetConnectBackoff()
1053 }
1054}
1055
1056// Close tears down the ClientConn and all underlying connections.
1057func (cc *ClientConn) Close() error {
1058 defer cc.cancel()
1059
1060 cc.mu.Lock()
1061 if cc.conns == nil {
1062 cc.mu.Unlock()
1063 return ErrClientConnClosing
1064 }
1065 conns := cc.conns
1066 cc.conns = nil
1067 cc.csMgr.updateState(connectivity.Shutdown)
1068
1069 rWrapper := cc.resolverWrapper
1070 cc.resolverWrapper = nil
1071 bWrapper := cc.balancerWrapper
1072 cc.balancerWrapper = nil
1073 cc.mu.Unlock()
1074
1075 cc.blockingpicker.close()
1076
1077 if bWrapper != nil {
1078 bWrapper.close()
1079 }
1080 if rWrapper != nil {
1081 rWrapper.close()
1082 }
1083
1084 for ac := range conns {
1085 ac.tearDown(ErrClientConnClosing)
1086 }
1087 if channelz.IsOn() {
1088 ted := &channelz.TraceEventDesc{
1089 Desc: "Channel Deleted",
1090 Severity: channelz.CtInfo,
1091 }
1092 if cc.dopts.channelzParentID != 0 {
1093 ted.Parent = &channelz.TraceEventDesc{
1094 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1095 Severity: channelz.CtInfo,
1096 }
1097 }
1098 channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
1099 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1100 // the entity being deleted, and thus prevent it from being deleted right away.
1101 channelz.RemoveEntry(cc.channelzID)
1102 }
1103 return nil
1104}
1105
1106// addrConn is a network connection to a given address.
1107type addrConn struct {
1108 ctx context.Context
1109 cancel context.CancelFunc
1110
1111 cc *ClientConn
1112 dopts dialOptions
1113 acbw balancer.SubConn
1114 scopts balancer.NewSubConnOptions
1115
1116 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1117 // health checking may require server to report healthy to set ac to READY), and is reset
1118 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1119 // is received, transport is closed, ac has been torn down).
1120 transport transport.ClientTransport // The current transport.
1121
1122 mu sync.Mutex
1123 curAddr resolver.Address // The current address.
1124 addrs []resolver.Address // All addresses that the resolver resolved to.
1125
1126 // Use updateConnectivityState for updating addrConn's connectivity state.
1127 state connectivity.State
1128
1129 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1130 resetBackoff chan struct{}
1131
1132 channelzID int64 // channelz unique identification number.
1133 czData *channelzData
1134}
1135
1136// Note: this requires a lock on ac.mu.
1137func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1138 if ac.state == s {
1139 return
1140 }
1141 ac.state = s
1142 channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
1143 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1144}
1145
1146// adjustParams updates parameters used to create transports upon
1147// receiving a GoAway.
1148func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1149 switch r {
1150 case transport.GoAwayTooManyPings:
1151 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1152 ac.cc.mu.Lock()
1153 if v > ac.cc.mkp.Time {
1154 ac.cc.mkp.Time = v
1155 }
1156 ac.cc.mu.Unlock()
1157 }
1158}
1159
1160func (ac *addrConn) resetTransport() {
1161 ac.mu.Lock()
1162 if ac.state == connectivity.Shutdown {
1163 ac.mu.Unlock()
1164 return
1165 }
1166
1167 addrs := ac.addrs
1168 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1169 // This will be the duration that dial gets to finish.
1170 dialDuration := minConnectTimeout
1171 if ac.dopts.minConnectTimeout != nil {
1172 dialDuration = ac.dopts.minConnectTimeout()
1173 }
1174
1175 if dialDuration < backoffFor {
1176 // Give dial more time as we keep failing to connect.
1177 dialDuration = backoffFor
1178 }
1179 // We can potentially spend all the time trying the first address, and
1180 // if the server accepts the connection and then hangs, the following
1181 // addresses will never be tried.
1182 //
1183 // The spec doesn't mention what should be done for multiple addresses.
1184 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1185 connectDeadline := time.Now().Add(dialDuration)
1186
1187 ac.updateConnectivityState(connectivity.Connecting, nil)
1188 ac.mu.Unlock()
1189
1190 if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
1191 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1192 // After exhausting all addresses, the addrConn enters
1193 // TRANSIENT_FAILURE.
1194 ac.mu.Lock()
1195 if ac.state == connectivity.Shutdown {
1196 ac.mu.Unlock()
1197 return
1198 }
1199 ac.updateConnectivityState(connectivity.TransientFailure, err)
1200
1201 // Backoff.
1202 b := ac.resetBackoff
1203 ac.mu.Unlock()
1204
1205 timer := time.NewTimer(backoffFor)
1206 select {
1207 case <-timer.C:
1208 ac.mu.Lock()
1209 ac.backoffIdx++
1210 ac.mu.Unlock()
1211 case <-b:
1212 timer.Stop()
1213 case <-ac.ctx.Done():
1214 timer.Stop()
1215 return
1216 }
1217
1218 ac.mu.Lock()
1219 if ac.state != connectivity.Shutdown {
1220 ac.updateConnectivityState(connectivity.Idle, err)
1221 }
1222 ac.mu.Unlock()
1223 return
1224 }
1225 // Success; reset backoff.
1226 ac.mu.Lock()
1227 ac.backoffIdx = 0
1228 ac.mu.Unlock()
1229}
1230
1231// tryAllAddrs tries to creates a connection to the addresses, and stop when at
1232// the first successful one. It returns an error if no address was successfully
1233// connected, or updates ac appropriately with the new transport.
1234func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
1235 var firstConnErr error
1236 for _, addr := range addrs {
1237 ac.mu.Lock()
1238 if ac.state == connectivity.Shutdown {
1239 ac.mu.Unlock()
1240 return errConnClosing
1241 }
1242
1243 ac.cc.mu.RLock()
1244 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1245 ac.cc.mu.RUnlock()
1246
1247 copts := ac.dopts.copts
1248 if ac.scopts.CredsBundle != nil {
1249 copts.CredsBundle = ac.scopts.CredsBundle
1250 }
1251 ac.mu.Unlock()
1252
1253 channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1254
1255 err := ac.createTransport(addr, copts, connectDeadline)
1256 if err == nil {
1257 return nil
1258 }
1259 if firstConnErr == nil {
1260 firstConnErr = err
1261 }
1262 ac.cc.updateConnectionError(err)
1263 }
1264
1265 // Couldn't connect to any address.
1266 return firstConnErr
1267}
1268
1269// createTransport creates a connection to addr. It returns an error if the
1270// address was not successfully connected, or updates ac appropriately with the
1271// new transport.
1272func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1273 // TODO: Delete prefaceReceived and move the logic to wait for it into the
1274 // transport.
1275 prefaceReceived := grpcsync.NewEvent()
1276 connClosed := grpcsync.NewEvent()
1277
1278 // addr.ServerName takes precedent over ClientConn authority, if present.
1279 if addr.ServerName == "" {
1280 addr.ServerName = ac.cc.authority
1281 }
1282
1283 hctx, hcancel := context.WithCancel(ac.ctx)
1284 hcStarted := false // protected by ac.mu
1285
1286 onClose := func() {
1287 ac.mu.Lock()
1288 defer ac.mu.Unlock()
1289 defer connClosed.Fire()
1290 if !hcStarted || hctx.Err() != nil {
1291 // We didn't start the health check or set the state to READY, so
1292 // no need to do anything else here.
1293 //
1294 // OR, we have already cancelled the health check context, meaning
1295 // we have already called onClose once for this transport. In this
1296 // case it would be dangerous to clear the transport and update the
1297 // state, since there may be a new transport in this addrConn.
1298 return
1299 }
1300 hcancel()
1301 ac.transport = nil
1302 // Refresh the name resolver
1303 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1304 if ac.state != connectivity.Shutdown {
1305 ac.updateConnectivityState(connectivity.Idle, nil)
1306 }
1307 }
1308
1309 onGoAway := func(r transport.GoAwayReason) {
1310 ac.mu.Lock()
1311 ac.adjustParams(r)
1312 ac.mu.Unlock()
1313 onClose()
1314 }
1315
1316 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1317 defer cancel()
1318 if channelz.IsOn() {
1319 copts.ChannelzParentID = ac.channelzID
1320 }
1321
1322 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
1323 if err != nil {
1324 // newTr is either nil, or closed.
1325 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
1326 return err
1327 }
1328
1329 select {
1330 case <-connectCtx.Done():
1331 // We didn't get the preface in time.
1332 // The error we pass to Close() is immaterial since there are no open
1333 // streams at this point, so no trailers with error details will be sent
1334 // out. We just need to pass a non-nil error.
1335 newTr.Close(transport.ErrConnClosing)
1336 if connectCtx.Err() == context.DeadlineExceeded {
1337 err := errors.New("failed to receive server preface within timeout")
1338 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
1339 return err
1340 }
1341 return nil
1342 case <-prefaceReceived.Done():
1343 // We got the preface - huzzah! things are good.
1344 ac.mu.Lock()
1345 defer ac.mu.Unlock()
1346 if connClosed.HasFired() {
1347 // onClose called first; go idle but do nothing else.
1348 if ac.state != connectivity.Shutdown {
1349 ac.updateConnectivityState(connectivity.Idle, nil)
1350 }
1351 return nil
1352 }
1353 if ac.state == connectivity.Shutdown {
1354 // This can happen if the subConn was removed while in `Connecting`
1355 // state. tearDown() would have set the state to `Shutdown`, but
1356 // would not have closed the transport since ac.transport would not
1357 // been set at that point.
1358 //
1359 // We run this in a goroutine because newTr.Close() calls onClose()
1360 // inline, which requires locking ac.mu.
1361 //
1362 // The error we pass to Close() is immaterial since there are no open
1363 // streams at this point, so no trailers with error details will be sent
1364 // out. We just need to pass a non-nil error.
1365 go newTr.Close(transport.ErrConnClosing)
1366 return nil
1367 }
1368 ac.curAddr = addr
1369 ac.transport = newTr
1370 hcStarted = true
1371 ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
1372 return nil
1373 case <-connClosed.Done():
1374 // The transport has already closed. If we received the preface, too,
1375 // this is not an error.
1376 select {
1377 case <-prefaceReceived.Done():
1378 return nil
1379 default:
1380 return errors.New("connection closed before server preface received")
1381 }
1382 }
1383}
1384
1385// startHealthCheck starts the health checking stream (RPC) to watch the health
1386// stats of this connection if health checking is requested and configured.
1387//
1388// LB channel health checking is enabled when all requirements below are met:
1389// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1390// 2. internal.HealthCheckFunc is set by importing the grpc/health package
1391// 3. a service config with non-empty healthCheckConfig field is provided
1392// 4. the load balancer requests it
1393//
1394// It sets addrConn to READY if the health checking stream is not started.
1395//
1396// Caller must hold ac.mu.
1397func (ac *addrConn) startHealthCheck(ctx context.Context) {
1398 var healthcheckManagingState bool
1399 defer func() {
1400 if !healthcheckManagingState {
1401 ac.updateConnectivityState(connectivity.Ready, nil)
1402 }
1403 }()
1404
1405 if ac.cc.dopts.disableHealthCheck {
1406 return
1407 }
1408 healthCheckConfig := ac.cc.healthCheckConfig()
1409 if healthCheckConfig == nil {
1410 return
1411 }
1412 if !ac.scopts.HealthCheckEnabled {
1413 return
1414 }
1415 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1416 if healthCheckFunc == nil {
1417 // The health package is not imported to set health check function.
1418 //
1419 // TODO: add a link to the health check doc in the error message.
1420 channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
1421 return
1422 }
1423
1424 healthcheckManagingState = true
1425
1426 // Set up the health check helper functions.
1427 currentTr := ac.transport
1428 newStream := func(method string) (interface{}, error) {
1429 ac.mu.Lock()
1430 if ac.transport != currentTr {
1431 ac.mu.Unlock()
1432 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1433 }
1434 ac.mu.Unlock()
1435 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1436 }
1437 setConnectivityState := func(s connectivity.State, lastErr error) {
1438 ac.mu.Lock()
1439 defer ac.mu.Unlock()
1440 if ac.transport != currentTr {
1441 return
1442 }
1443 ac.updateConnectivityState(s, lastErr)
1444 }
1445 // Start the health checking stream.
1446 go func() {
1447 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1448 if err != nil {
1449 if status.Code(err) == codes.Unimplemented {
1450 channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1451 } else {
1452 channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1453 }
1454 }
1455 }()
1456}
1457
1458func (ac *addrConn) resetConnectBackoff() {
1459 ac.mu.Lock()
1460 close(ac.resetBackoff)
1461 ac.backoffIdx = 0
1462 ac.resetBackoff = make(chan struct{})
1463 ac.mu.Unlock()
1464}
1465
1466// getReadyTransport returns the transport if ac's state is READY or nil if not.
1467func (ac *addrConn) getReadyTransport() transport.ClientTransport {
1468 ac.mu.Lock()
1469 defer ac.mu.Unlock()
1470 if ac.state == connectivity.Ready {
1471 return ac.transport
1472 }
1473 return nil
1474}
1475
1476// tearDown starts to tear down the addrConn.
1477//
1478// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
1479// will leak. In most cases, call cc.removeAddrConn() instead.
1480func (ac *addrConn) tearDown(err error) {
1481 ac.mu.Lock()
1482 if ac.state == connectivity.Shutdown {
1483 ac.mu.Unlock()
1484 return
1485 }
1486 curTr := ac.transport
1487 ac.transport = nil
1488 // We have to set the state to Shutdown before anything else to prevent races
1489 // between setting the state and logic that waits on context cancellation / etc.
1490 ac.updateConnectivityState(connectivity.Shutdown, nil)
1491 ac.cancel()
1492 ac.curAddr = resolver.Address{}
1493 if err == errConnDrain && curTr != nil {
1494 // GracefulClose(...) may be executed multiple times when
1495 // i) receiving multiple GoAway frames from the server; or
1496 // ii) there are concurrent name resolver/Balancer triggered
1497 // address removal and GoAway.
1498 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1499 ac.mu.Unlock()
1500 curTr.GracefulClose()
1501 ac.mu.Lock()
1502 }
1503 if channelz.IsOn() {
1504 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
1505 Desc: "Subchannel Deleted",
1506 Severity: channelz.CtInfo,
1507 Parent: &channelz.TraceEventDesc{
1508 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1509 Severity: channelz.CtInfo,
1510 },
1511 })
1512 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1513 // the entity being deleted, and thus prevent it from being deleted right away.
1514 channelz.RemoveEntry(ac.channelzID)
1515 }
1516 ac.mu.Unlock()
1517}
1518
1519func (ac *addrConn) getState() connectivity.State {
1520 ac.mu.Lock()
1521 defer ac.mu.Unlock()
1522 return ac.state
1523}
1524
1525func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1526 ac.mu.Lock()
1527 addr := ac.curAddr.Addr
1528 ac.mu.Unlock()
1529 return &channelz.ChannelInternalMetric{
1530 State: ac.getState(),
1531 Target: addr,
1532 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1533 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1534 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1535 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1536 }
1537}
1538
1539func (ac *addrConn) incrCallsStarted() {
1540 atomic.AddInt64(&ac.czData.callsStarted, 1)
1541 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1542}
1543
1544func (ac *addrConn) incrCallsSucceeded() {
1545 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1546}
1547
1548func (ac *addrConn) incrCallsFailed() {
1549 atomic.AddInt64(&ac.czData.callsFailed, 1)
1550}
1551
1552type retryThrottler struct {
1553 max float64
1554 thresh float64
1555 ratio float64
1556
1557 mu sync.Mutex
1558 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1559}
1560
1561// throttle subtracts a retry token from the pool and returns whether a retry
1562// should be throttled (disallowed) based upon the retry throttling policy in
1563// the service config.
1564func (rt *retryThrottler) throttle() bool {
1565 if rt == nil {
1566 return false
1567 }
1568 rt.mu.Lock()
1569 defer rt.mu.Unlock()
1570 rt.tokens--
1571 if rt.tokens < 0 {
1572 rt.tokens = 0
1573 }
1574 return rt.tokens <= rt.thresh
1575}
1576
1577func (rt *retryThrottler) successfulRPC() {
1578 if rt == nil {
1579 return
1580 }
1581 rt.mu.Lock()
1582 defer rt.mu.Unlock()
1583 rt.tokens += rt.ratio
1584 if rt.tokens > rt.max {
1585 rt.tokens = rt.max
1586 }
1587}
1588
1589type channelzChannel struct {
1590 cc *ClientConn
1591}
1592
1593func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1594 return c.cc.channelzMetric()
1595}
1596
1597// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1598// underlying connections within the specified timeout.
1599//
1600// Deprecated: This error is never returned by grpc and should not be
1601// referenced by users.
1602var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1603
1604func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1605 for _, rb := range cc.dopts.resolvers {
1606 if scheme == rb.Scheme() {
1607 return rb
1608 }
1609 }
1610 return resolver.Get(scheme)
1611}
1612
1613func (cc *ClientConn) updateConnectionError(err error) {
1614 cc.lceMu.Lock()
1615 cc.lastConnectionError = err
1616 cc.lceMu.Unlock()
1617}
1618
1619func (cc *ClientConn) connectionError() error {
1620 cc.lceMu.Lock()
1621 defer cc.lceMu.Unlock()
1622 return cc.lastConnectionError
1623}