blob: 28f09dc87073da442b3dcabd37d7ce5ef4582bea [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
khenaidoo5cb0d402021-12-08 14:09:16 -050026 "net/url"
khenaidoo5fc5cea2021-08-11 17:39:16 -040027 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/internal/backoff"
39 "google.golang.org/grpc/internal/channelz"
40 "google.golang.org/grpc/internal/grpcsync"
khenaidoo5fc5cea2021-08-11 17:39:16 -040041 iresolver "google.golang.org/grpc/internal/resolver"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/serviceconfig"
46 "google.golang.org/grpc/status"
47
48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
52)
53
54const (
55 // minimum time to give a connection to complete
56 minConnectTimeout = 20 * time.Second
57 // must match grpclbName in grpclb/grpclb.go
58 grpclbName = "grpclb"
59)
60
61var (
62 // ErrClientConnClosing indicates that the operation is illegal because
63 // the ClientConn is closing.
64 //
65 // Deprecated: this error should not be relied upon by users; use the status
66 // code of Canceled instead.
67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69 errConnDrain = errors.New("grpc: the connection is drained")
70 // errConnClosing indicates that the connection is closing.
71 errConnClosing = errors.New("grpc: the connection is closing")
72 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
73 // service config.
74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
75)
76
77// The following errors are returned from Dial and DialContext
78var (
79 // errNoTransportSecurity indicates that there is no transport security
80 // being set for ClientConn. Users should either set one or explicitly
81 // call WithInsecure DialOption to disable security.
82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
83 // errTransportCredsAndBundle indicates that creds bundle is used together
84 // with other individual Transport Credentials.
85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
khenaidoo257f3192021-12-15 16:46:37 -050086 // errNoTransportCredsInBundle indicated that the configured creds bundle
87 // returned a transport credentials which was nil.
88 errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
89 // errTransportCredentialsMissing indicates that users want to transmit
90 // security information (e.g., OAuth2 token) which requires secure
91 // connection on an insecure connection.
khenaidoo5fc5cea2021-08-11 17:39:16 -040092 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
khenaidoo5fc5cea2021-08-11 17:39:16 -040093)
94
95const (
96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97 defaultClientMaxSendMessageSize = math.MaxInt32
98 // http2IOBufSize specifies the buffer size for sending frames.
99 defaultWriteBufSize = 32 * 1024
100 defaultReadBufSize = 32 * 1024
101)
102
103// Dial creates a client connection to the given target.
104func Dial(target string, opts ...DialOption) (*ClientConn, error) {
105 return DialContext(context.Background(), target, opts...)
106}
107
108type defaultConfigSelector struct {
109 sc *ServiceConfig
110}
111
112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
113 return &iresolver.RPCConfig{
114 Context: rpcInfo.Context,
115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
116 }, nil
117}
118
119// DialContext creates a client connection to the given target. By default, it's
120// a non-blocking dial (the function won't wait for connections to be
121// established, and connecting happens in the background). To make it a blocking
122// dial, use WithBlock() dial option.
123//
124// In the non-blocking case, the ctx does not act against the connection. It
125// only controls the setup steps.
126//
127// In the blocking case, ctx can be used to cancel or expire the pending
128// connection. Once this function returns, the cancellation and expiration of
129// ctx will be noop. Users should call ClientConn.Close to terminate all the
130// pending operations after this function returns.
131//
132// The target name syntax is defined in
133// https://github.com/grpc/grpc/blob/master/doc/naming.md.
134// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
135func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
136 cc := &ClientConn{
137 target: target,
138 csMgr: &connectivityStateManager{},
139 conns: make(map[*addrConn]struct{}),
140 dopts: defaultDialOptions(),
141 blockingpicker: newPickerWrapper(),
142 czData: new(channelzData),
143 firstResolveEvent: grpcsync.NewEvent(),
144 }
145 cc.retryThrottler.Store((*retryThrottler)(nil))
146 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
147 cc.ctx, cc.cancel = context.WithCancel(context.Background())
148
149 for _, opt := range opts {
150 opt.apply(&cc.dopts)
151 }
152
153 chainUnaryClientInterceptors(cc)
154 chainStreamClientInterceptors(cc)
155
156 defer func() {
157 if err != nil {
158 cc.Close()
159 }
160 }()
161
162 if channelz.IsOn() {
163 if cc.dopts.channelzParentID != 0 {
164 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
165 channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
166 Desc: "Channel Created",
167 Severity: channelz.CtInfo,
168 Parent: &channelz.TraceEventDesc{
169 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
170 Severity: channelz.CtInfo,
171 },
172 })
173 } else {
174 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
175 channelz.Info(logger, cc.channelzID, "Channel Created")
176 }
177 cc.csMgr.channelzID = cc.channelzID
178 }
179
khenaidoo257f3192021-12-15 16:46:37 -0500180 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
181 return nil, errNoTransportSecurity
182 }
183 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
184 return nil, errTransportCredsAndBundle
185 }
186 if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
187 return nil, errNoTransportCredsInBundle
188 }
189 transportCreds := cc.dopts.copts.TransportCredentials
190 if transportCreds == nil {
191 transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
192 }
193 if transportCreds.Info().SecurityProtocol == "insecure" {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400194 for _, cd := range cc.dopts.copts.PerRPCCredentials {
195 if cd.RequireTransportSecurity() {
196 return nil, errTransportCredentialsMissing
197 }
198 }
199 }
200
201 if cc.dopts.defaultServiceConfigRawJSON != nil {
202 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
203 if scpr.Err != nil {
204 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
205 }
206 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
207 }
208 cc.mkp = cc.dopts.copts.KeepaliveParams
209
210 if cc.dopts.copts.UserAgent != "" {
211 cc.dopts.copts.UserAgent += " " + grpcUA
212 } else {
213 cc.dopts.copts.UserAgent = grpcUA
214 }
215
216 if cc.dopts.timeout > 0 {
217 var cancel context.CancelFunc
218 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
219 defer cancel()
220 }
221 defer func() {
222 select {
223 case <-ctx.Done():
224 switch {
225 case ctx.Err() == err:
226 conn = nil
227 case err == nil || !cc.dopts.returnLastError:
228 conn, err = nil, ctx.Err()
229 default:
230 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
231 }
232 default:
233 }
234 }()
235
236 scSet := false
237 if cc.dopts.scChan != nil {
238 // Try to get an initial service config.
239 select {
240 case sc, ok := <-cc.dopts.scChan:
241 if ok {
242 cc.sc = &sc
243 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
244 scSet = true
245 }
246 default:
247 }
248 }
249 if cc.dopts.bs == nil {
250 cc.dopts.bs = backoff.DefaultExponential
251 }
252
253 // Determine the resolver to use.
khenaidoo5cb0d402021-12-08 14:09:16 -0500254 resolverBuilder, err := cc.parseTargetAndFindResolver()
255 if err != nil {
256 return nil, err
khenaidoo5fc5cea2021-08-11 17:39:16 -0400257 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500258 cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
259 if err != nil {
260 return nil, err
khenaidoo5fc5cea2021-08-11 17:39:16 -0400261 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500262 channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400263
264 if cc.dopts.scChan != nil && !scSet {
265 // Blocking wait for the initial service config.
266 select {
267 case sc, ok := <-cc.dopts.scChan:
268 if ok {
269 cc.sc = &sc
270 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
271 }
272 case <-ctx.Done():
273 return nil, ctx.Err()
274 }
275 }
276 if cc.dopts.scChan != nil {
277 go cc.scWatcher()
278 }
279
280 var credsClone credentials.TransportCredentials
281 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
282 credsClone = creds.Clone()
283 }
284 cc.balancerBuildOpts = balancer.BuildOptions{
285 DialCreds: credsClone,
286 CredsBundle: cc.dopts.copts.CredsBundle,
287 Dialer: cc.dopts.copts.Dialer,
khenaidoo257f3192021-12-15 16:46:37 -0500288 Authority: cc.authority,
khenaidoo5fc5cea2021-08-11 17:39:16 -0400289 CustomUserAgent: cc.dopts.copts.UserAgent,
290 ChannelzParentID: cc.channelzID,
291 Target: cc.parsedTarget,
292 }
293
294 // Build the resolver.
295 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
296 if err != nil {
297 return nil, fmt.Errorf("failed to build resolver: %v", err)
298 }
299 cc.mu.Lock()
300 cc.resolverWrapper = rWrapper
301 cc.mu.Unlock()
302
303 // A blocking dial blocks until the clientConn is ready.
304 if cc.dopts.block {
305 for {
306 cc.Connect()
307 s := cc.GetState()
308 if s == connectivity.Ready {
309 break
310 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
311 if err = cc.connectionError(); err != nil {
312 terr, ok := err.(interface {
313 Temporary() bool
314 })
315 if ok && !terr.Temporary() {
316 return nil, err
317 }
318 }
319 }
320 if !cc.WaitForStateChange(ctx, s) {
321 // ctx got timeout or canceled.
322 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
323 return nil, err
324 }
325 return nil, ctx.Err()
326 }
327 }
328 }
329
330 return cc, nil
331}
332
333// chainUnaryClientInterceptors chains all unary client interceptors into one.
334func chainUnaryClientInterceptors(cc *ClientConn) {
335 interceptors := cc.dopts.chainUnaryInts
336 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
337 // be executed before any other chained interceptors.
338 if cc.dopts.unaryInt != nil {
339 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
340 }
341 var chainedInt UnaryClientInterceptor
342 if len(interceptors) == 0 {
343 chainedInt = nil
344 } else if len(interceptors) == 1 {
345 chainedInt = interceptors[0]
346 } else {
347 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
348 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
349 }
350 }
351 cc.dopts.unaryInt = chainedInt
352}
353
354// getChainUnaryInvoker recursively generate the chained unary invoker.
355func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
356 if curr == len(interceptors)-1 {
357 return finalInvoker
358 }
359 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
360 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
361 }
362}
363
364// chainStreamClientInterceptors chains all stream client interceptors into one.
365func chainStreamClientInterceptors(cc *ClientConn) {
366 interceptors := cc.dopts.chainStreamInts
367 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
368 // be executed before any other chained interceptors.
369 if cc.dopts.streamInt != nil {
370 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
371 }
372 var chainedInt StreamClientInterceptor
373 if len(interceptors) == 0 {
374 chainedInt = nil
375 } else if len(interceptors) == 1 {
376 chainedInt = interceptors[0]
377 } else {
378 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
379 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
380 }
381 }
382 cc.dopts.streamInt = chainedInt
383}
384
385// getChainStreamer recursively generate the chained client stream constructor.
386func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
387 if curr == len(interceptors)-1 {
388 return finalStreamer
389 }
390 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
391 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
392 }
393}
394
395// connectivityStateManager keeps the connectivity.State of ClientConn.
396// This struct will eventually be exported so the balancers can access it.
397type connectivityStateManager struct {
398 mu sync.Mutex
399 state connectivity.State
400 notifyChan chan struct{}
401 channelzID int64
402}
403
404// updateState updates the connectivity.State of ClientConn.
405// If there's a change it notifies goroutines waiting on state change to
406// happen.
407func (csm *connectivityStateManager) updateState(state connectivity.State) {
408 csm.mu.Lock()
409 defer csm.mu.Unlock()
410 if csm.state == connectivity.Shutdown {
411 return
412 }
413 if csm.state == state {
414 return
415 }
416 csm.state = state
417 channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
418 if csm.notifyChan != nil {
419 // There are other goroutines waiting on this channel.
420 close(csm.notifyChan)
421 csm.notifyChan = nil
422 }
423}
424
425func (csm *connectivityStateManager) getState() connectivity.State {
426 csm.mu.Lock()
427 defer csm.mu.Unlock()
428 return csm.state
429}
430
431func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
432 csm.mu.Lock()
433 defer csm.mu.Unlock()
434 if csm.notifyChan == nil {
435 csm.notifyChan = make(chan struct{})
436 }
437 return csm.notifyChan
438}
439
440// ClientConnInterface defines the functions clients need to perform unary and
441// streaming RPCs. It is implemented by *ClientConn, and is only intended to
442// be referenced by generated code.
443type ClientConnInterface interface {
444 // Invoke performs a unary RPC and returns after the response is received
445 // into reply.
446 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
447 // NewStream begins a streaming RPC.
448 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
449}
450
451// Assert *ClientConn implements ClientConnInterface.
452var _ ClientConnInterface = (*ClientConn)(nil)
453
454// ClientConn represents a virtual connection to a conceptual endpoint, to
455// perform RPCs.
456//
457// A ClientConn is free to have zero or more actual connections to the endpoint
458// based on configuration, load, etc. It is also free to determine which actual
459// endpoints to use and may change it every RPC, permitting client-side load
460// balancing.
461//
462// A ClientConn encapsulates a range of functionality including name
463// resolution, TCP connection establishment (with retries and backoff) and TLS
464// handshakes. It also handles errors on established connections by
465// re-resolving the name and reconnecting.
466type ClientConn struct {
467 ctx context.Context
468 cancel context.CancelFunc
469
470 target string
471 parsedTarget resolver.Target
472 authority string
473 dopts dialOptions
474 csMgr *connectivityStateManager
475
476 balancerBuildOpts balancer.BuildOptions
477 blockingpicker *pickerWrapper
478
479 safeConfigSelector iresolver.SafeConfigSelector
480
481 mu sync.RWMutex
482 resolverWrapper *ccResolverWrapper
483 sc *ServiceConfig
484 conns map[*addrConn]struct{}
485 // Keepalive parameter can be updated if a GoAway is received.
486 mkp keepalive.ClientParameters
487 curBalancerName string
488 balancerWrapper *ccBalancerWrapper
489 retryThrottler atomic.Value
490
491 firstResolveEvent *grpcsync.Event
492
493 channelzID int64 // channelz unique identification number
494 czData *channelzData
495
496 lceMu sync.Mutex // protects lastConnectionError
497 lastConnectionError error
498}
499
500// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
501// ctx expires. A true value is returned in former case and false in latter.
502//
503// Experimental
504//
505// Notice: This API is EXPERIMENTAL and may be changed or removed in a
506// later release.
507func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
508 ch := cc.csMgr.getNotifyChan()
509 if cc.csMgr.getState() != sourceState {
510 return true
511 }
512 select {
513 case <-ctx.Done():
514 return false
515 case <-ch:
516 return true
517 }
518}
519
520// GetState returns the connectivity.State of ClientConn.
521//
522// Experimental
523//
524// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
525// release.
526func (cc *ClientConn) GetState() connectivity.State {
527 return cc.csMgr.getState()
528}
529
530// Connect causes all subchannels in the ClientConn to attempt to connect if
531// the channel is idle. Does not wait for the connection attempts to begin
532// before returning.
533//
534// Experimental
535//
536// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
537// release.
538func (cc *ClientConn) Connect() {
539 cc.mu.Lock()
540 defer cc.mu.Unlock()
541 if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
542 return
543 }
544 for ac := range cc.conns {
545 go ac.connect()
546 }
547}
548
549func (cc *ClientConn) scWatcher() {
550 for {
551 select {
552 case sc, ok := <-cc.dopts.scChan:
553 if !ok {
554 return
555 }
556 cc.mu.Lock()
557 // TODO: load balance policy runtime change is ignored.
558 // We may revisit this decision in the future.
559 cc.sc = &sc
560 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
561 cc.mu.Unlock()
562 case <-cc.ctx.Done():
563 return
564 }
565 }
566}
567
568// waitForResolvedAddrs blocks until the resolver has provided addresses or the
569// context expires. Returns nil unless the context expires first; otherwise
570// returns a status error based on the context.
571func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
572 // This is on the RPC path, so we use a fast path to avoid the
573 // more-expensive "select" below after the resolver has returned once.
574 if cc.firstResolveEvent.HasFired() {
575 return nil
576 }
577 select {
578 case <-cc.firstResolveEvent.Done():
579 return nil
580 case <-ctx.Done():
581 return status.FromContextError(ctx.Err()).Err()
582 case <-cc.ctx.Done():
583 return ErrClientConnClosing
584 }
585}
586
587var emptyServiceConfig *ServiceConfig
588
589func init() {
590 cfg := parseServiceConfig("{}")
591 if cfg.Err != nil {
592 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
593 }
594 emptyServiceConfig = cfg.Config.(*ServiceConfig)
595}
596
597func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
598 if cc.sc != nil {
599 cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
600 return
601 }
602 if cc.dopts.defaultServiceConfig != nil {
603 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
604 } else {
605 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
606 }
607}
608
609func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
610 defer cc.firstResolveEvent.Fire()
611 cc.mu.Lock()
612 // Check if the ClientConn is already closed. Some fields (e.g.
613 // balancerWrapper) are set to nil when closing the ClientConn, and could
614 // cause nil pointer panic if we don't have this check.
615 if cc.conns == nil {
616 cc.mu.Unlock()
617 return nil
618 }
619
620 if err != nil {
621 // May need to apply the initial service config in case the resolver
622 // doesn't support service configs, or doesn't provide a service config
623 // with the new addresses.
624 cc.maybeApplyDefaultServiceConfig(nil)
625
626 if cc.balancerWrapper != nil {
627 cc.balancerWrapper.resolverError(err)
628 }
629
630 // No addresses are valid with err set; return early.
631 cc.mu.Unlock()
632 return balancer.ErrBadResolverState
633 }
634
635 var ret error
khenaidoo257f3192021-12-15 16:46:37 -0500636 if cc.dopts.disableServiceConfig {
637 channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
638 cc.maybeApplyDefaultServiceConfig(s.Addresses)
639 } else if s.ServiceConfig == nil {
khenaidoo5fc5cea2021-08-11 17:39:16 -0400640 cc.maybeApplyDefaultServiceConfig(s.Addresses)
641 // TODO: do we need to apply a failing LB policy if there is no
642 // default, per the error handling design?
643 } else {
644 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
645 configSelector := iresolver.GetConfigSelector(s)
646 if configSelector != nil {
647 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
648 channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
649 }
650 } else {
651 configSelector = &defaultConfigSelector{sc}
652 }
653 cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
654 } else {
655 ret = balancer.ErrBadResolverState
656 if cc.balancerWrapper == nil {
657 var err error
658 if s.ServiceConfig.Err != nil {
659 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
660 } else {
661 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
662 }
663 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
664 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
665 cc.csMgr.updateState(connectivity.TransientFailure)
666 cc.mu.Unlock()
667 return ret
668 }
669 }
670 }
671
672 var balCfg serviceconfig.LoadBalancingConfig
673 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
674 balCfg = cc.sc.lbConfig.cfg
675 }
676
677 cbn := cc.curBalancerName
678 bw := cc.balancerWrapper
679 cc.mu.Unlock()
680 if cbn != grpclbName {
681 // Filter any grpclb addresses since we don't have the grpclb balancer.
682 for i := 0; i < len(s.Addresses); {
683 if s.Addresses[i].Type == resolver.GRPCLB {
684 copy(s.Addresses[i:], s.Addresses[i+1:])
685 s.Addresses = s.Addresses[:len(s.Addresses)-1]
686 continue
687 }
688 i++
689 }
690 }
691 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
692 if ret == nil {
693 ret = uccsErr // prefer ErrBadResolver state since any other error is
694 // currently meaningless to the caller.
695 }
696 return ret
697}
698
699// switchBalancer starts the switching from current balancer to the balancer
700// with the given name.
701//
702// It will NOT send the current address list to the new balancer. If needed,
703// caller of this function should send address list to the new balancer after
704// this function returns.
705//
706// Caller must hold cc.mu.
707func (cc *ClientConn) switchBalancer(name string) {
708 if strings.EqualFold(cc.curBalancerName, name) {
709 return
710 }
711
712 channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
713 if cc.dopts.balancerBuilder != nil {
714 channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
715 return
716 }
717 if cc.balancerWrapper != nil {
718 // Don't hold cc.mu while closing the balancers. The balancers may call
719 // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
720 // would cause a deadlock in that case.
721 cc.mu.Unlock()
722 cc.balancerWrapper.close()
723 cc.mu.Lock()
724 }
725
726 builder := balancer.Get(name)
727 if builder == nil {
728 channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
729 channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
730 builder = newPickfirstBuilder()
731 } else {
732 channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
733 }
734
735 cc.curBalancerName = builder.Name()
736 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
737}
738
739func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
740 cc.mu.Lock()
741 if cc.conns == nil {
742 cc.mu.Unlock()
743 return
744 }
745 // TODO(bar switching) send updates to all balancer wrappers when balancer
746 // gracefully switching is supported.
747 cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
748 cc.mu.Unlock()
749}
750
751// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
752//
753// Caller needs to make sure len(addrs) > 0.
754func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
755 ac := &addrConn{
756 state: connectivity.Idle,
757 cc: cc,
758 addrs: addrs,
759 scopts: opts,
760 dopts: cc.dopts,
761 czData: new(channelzData),
762 resetBackoff: make(chan struct{}),
763 }
764 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
765 // Track ac in cc. This needs to be done before any getTransport(...) is called.
766 cc.mu.Lock()
767 if cc.conns == nil {
768 cc.mu.Unlock()
769 return nil, ErrClientConnClosing
770 }
771 if channelz.IsOn() {
772 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
773 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
774 Desc: "Subchannel Created",
775 Severity: channelz.CtInfo,
776 Parent: &channelz.TraceEventDesc{
777 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
778 Severity: channelz.CtInfo,
779 },
780 })
781 }
782 cc.conns[ac] = struct{}{}
783 cc.mu.Unlock()
784 return ac, nil
785}
786
787// removeAddrConn removes the addrConn in the subConn from clientConn.
788// It also tears down the ac with the given error.
789func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
790 cc.mu.Lock()
791 if cc.conns == nil {
792 cc.mu.Unlock()
793 return
794 }
795 delete(cc.conns, ac)
796 cc.mu.Unlock()
797 ac.tearDown(err)
798}
799
800func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
801 return &channelz.ChannelInternalMetric{
802 State: cc.GetState(),
803 Target: cc.target,
804 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
805 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
806 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
807 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
808 }
809}
810
811// Target returns the target string of the ClientConn.
812//
813// Experimental
814//
815// Notice: This API is EXPERIMENTAL and may be changed or removed in a
816// later release.
817func (cc *ClientConn) Target() string {
818 return cc.target
819}
820
821func (cc *ClientConn) incrCallsStarted() {
822 atomic.AddInt64(&cc.czData.callsStarted, 1)
823 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
824}
825
826func (cc *ClientConn) incrCallsSucceeded() {
827 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
828}
829
830func (cc *ClientConn) incrCallsFailed() {
831 atomic.AddInt64(&cc.czData.callsFailed, 1)
832}
833
834// connect starts creating a transport.
835// It does nothing if the ac is not IDLE.
836// TODO(bar) Move this to the addrConn section.
837func (ac *addrConn) connect() error {
838 ac.mu.Lock()
839 if ac.state == connectivity.Shutdown {
840 ac.mu.Unlock()
841 return errConnClosing
842 }
843 if ac.state != connectivity.Idle {
844 ac.mu.Unlock()
845 return nil
846 }
847 // Update connectivity state within the lock to prevent subsequent or
848 // concurrent calls from resetting the transport more than once.
849 ac.updateConnectivityState(connectivity.Connecting, nil)
850 ac.mu.Unlock()
851
852 ac.resetTransport()
853 return nil
854}
855
856// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
857//
858// If ac is Connecting, it returns false. The caller should tear down the ac and
859// create a new one. Note that the backoff will be reset when this happens.
860//
861// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
862// addresses will be picked up by retry in the next iteration after backoff.
863//
864// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
865//
866// If ac is Ready, it checks whether current connected address of ac is in the
867// new addrs list.
868// - If true, it updates ac.addrs and returns true. The ac will keep using
869// the existing connection.
870// - If false, it does nothing and returns false.
871func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
872 ac.mu.Lock()
873 defer ac.mu.Unlock()
874 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
875 if ac.state == connectivity.Shutdown ||
876 ac.state == connectivity.TransientFailure ||
877 ac.state == connectivity.Idle {
878 ac.addrs = addrs
879 return true
880 }
881
882 if ac.state == connectivity.Connecting {
883 return false
884 }
885
886 // ac.state is Ready, try to find the connected address.
887 var curAddrFound bool
888 for _, a := range addrs {
khenaidoo5cb0d402021-12-08 14:09:16 -0500889 a.ServerName = ac.cc.getServerName(a)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400890 if reflect.DeepEqual(ac.curAddr, a) {
891 curAddrFound = true
892 break
893 }
894 }
895 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
896 if curAddrFound {
897 ac.addrs = addrs
898 }
899
900 return curAddrFound
901}
902
khenaidoo5cb0d402021-12-08 14:09:16 -0500903// getServerName determines the serverName to be used in the connection
904// handshake. The default value for the serverName is the authority on the
905// ClientConn, which either comes from the user's dial target or through an
906// authority override specified using the WithAuthority dial option. Name
907// resolvers can specify a per-address override for the serverName through the
908// resolver.Address.ServerName field which is used only if the WithAuthority
909// dial option was not used. The rationale is that per-address authority
910// overrides specified by the name resolver can represent a security risk, while
911// an override specified by the user is more dependable since they probably know
912// what they are doing.
913func (cc *ClientConn) getServerName(addr resolver.Address) string {
914 if cc.dopts.authority != "" {
915 return cc.dopts.authority
916 }
917 if addr.ServerName != "" {
918 return addr.ServerName
919 }
920 return cc.authority
921}
922
khenaidoo5fc5cea2021-08-11 17:39:16 -0400923func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
924 if sc == nil {
925 return MethodConfig{}
926 }
927 if m, ok := sc.Methods[method]; ok {
928 return m
929 }
930 i := strings.LastIndex(method, "/")
931 if m, ok := sc.Methods[method[:i+1]]; ok {
932 return m
933 }
934 return sc.Methods[""]
935}
936
937// GetMethodConfig gets the method config of the input method.
938// If there's an exact match for input method (i.e. /service/method), we return
939// the corresponding MethodConfig.
940// If there isn't an exact match for the input method, we look for the service's default
941// config under the service (i.e /service/) and then for the default for all services (empty string).
942//
943// If there is a default MethodConfig for the service, we return it.
944// Otherwise, we return an empty MethodConfig.
945func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
946 // TODO: Avoid the locking here.
947 cc.mu.RLock()
948 defer cc.mu.RUnlock()
949 return getMethodConfig(cc.sc, method)
950}
951
952func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
953 cc.mu.RLock()
954 defer cc.mu.RUnlock()
955 if cc.sc == nil {
956 return nil
957 }
958 return cc.sc.healthCheckConfig
959}
960
961func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
962 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
963 Ctx: ctx,
964 FullMethodName: method,
965 })
966 if err != nil {
967 return nil, nil, toRPCErr(err)
968 }
969 return t, done, nil
970}
971
972func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
973 if sc == nil {
974 // should never reach here.
975 return
976 }
977 cc.sc = sc
978 if configSelector != nil {
979 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
980 }
981
982 if cc.sc.retryThrottling != nil {
983 newThrottler := &retryThrottler{
984 tokens: cc.sc.retryThrottling.MaxTokens,
985 max: cc.sc.retryThrottling.MaxTokens,
986 thresh: cc.sc.retryThrottling.MaxTokens / 2,
987 ratio: cc.sc.retryThrottling.TokenRatio,
988 }
989 cc.retryThrottler.Store(newThrottler)
990 } else {
991 cc.retryThrottler.Store((*retryThrottler)(nil))
992 }
993
994 if cc.dopts.balancerBuilder == nil {
995 // Only look at balancer types and switch balancer if balancer dial
996 // option is not set.
997 var newBalancerName string
998 if cc.sc != nil && cc.sc.lbConfig != nil {
999 newBalancerName = cc.sc.lbConfig.name
1000 } else {
1001 var isGRPCLB bool
1002 for _, a := range addrs {
1003 if a.Type == resolver.GRPCLB {
1004 isGRPCLB = true
1005 break
1006 }
1007 }
1008 if isGRPCLB {
1009 newBalancerName = grpclbName
1010 } else if cc.sc != nil && cc.sc.LB != nil {
1011 newBalancerName = *cc.sc.LB
1012 } else {
1013 newBalancerName = PickFirstBalancerName
1014 }
1015 }
1016 cc.switchBalancer(newBalancerName)
1017 } else if cc.balancerWrapper == nil {
1018 // Balancer dial option was set, and this is the first time handling
1019 // resolved addresses. Build a balancer with dopts.balancerBuilder.
1020 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
1021 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
1022 }
1023}
1024
1025func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
1026 cc.mu.RLock()
1027 r := cc.resolverWrapper
1028 cc.mu.RUnlock()
1029 if r == nil {
1030 return
1031 }
1032 go r.resolveNow(o)
1033}
1034
1035// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1036// them to attempt another connection immediately. It also resets the backoff
1037// times used for subsequent attempts regardless of the current state.
1038//
1039// In general, this function should not be used. Typical service or network
1040// outages result in a reasonable client reconnection strategy by default.
1041// However, if a previously unavailable network becomes available, this may be
1042// used to trigger an immediate reconnect.
1043//
1044// Experimental
1045//
1046// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1047// later release.
1048func (cc *ClientConn) ResetConnectBackoff() {
1049 cc.mu.Lock()
1050 conns := cc.conns
1051 cc.mu.Unlock()
1052 for ac := range conns {
1053 ac.resetConnectBackoff()
1054 }
1055}
1056
1057// Close tears down the ClientConn and all underlying connections.
1058func (cc *ClientConn) Close() error {
1059 defer cc.cancel()
1060
1061 cc.mu.Lock()
1062 if cc.conns == nil {
1063 cc.mu.Unlock()
1064 return ErrClientConnClosing
1065 }
1066 conns := cc.conns
1067 cc.conns = nil
1068 cc.csMgr.updateState(connectivity.Shutdown)
1069
1070 rWrapper := cc.resolverWrapper
1071 cc.resolverWrapper = nil
1072 bWrapper := cc.balancerWrapper
1073 cc.balancerWrapper = nil
1074 cc.mu.Unlock()
1075
1076 cc.blockingpicker.close()
1077
1078 if bWrapper != nil {
1079 bWrapper.close()
1080 }
1081 if rWrapper != nil {
1082 rWrapper.close()
1083 }
1084
1085 for ac := range conns {
1086 ac.tearDown(ErrClientConnClosing)
1087 }
1088 if channelz.IsOn() {
1089 ted := &channelz.TraceEventDesc{
1090 Desc: "Channel Deleted",
1091 Severity: channelz.CtInfo,
1092 }
1093 if cc.dopts.channelzParentID != 0 {
1094 ted.Parent = &channelz.TraceEventDesc{
1095 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1096 Severity: channelz.CtInfo,
1097 }
1098 }
1099 channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
1100 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1101 // the entity being deleted, and thus prevent it from being deleted right away.
1102 channelz.RemoveEntry(cc.channelzID)
1103 }
1104 return nil
1105}
1106
1107// addrConn is a network connection to a given address.
1108type addrConn struct {
1109 ctx context.Context
1110 cancel context.CancelFunc
1111
1112 cc *ClientConn
1113 dopts dialOptions
1114 acbw balancer.SubConn
1115 scopts balancer.NewSubConnOptions
1116
1117 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1118 // health checking may require server to report healthy to set ac to READY), and is reset
1119 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1120 // is received, transport is closed, ac has been torn down).
1121 transport transport.ClientTransport // The current transport.
1122
1123 mu sync.Mutex
1124 curAddr resolver.Address // The current address.
1125 addrs []resolver.Address // All addresses that the resolver resolved to.
1126
1127 // Use updateConnectivityState for updating addrConn's connectivity state.
1128 state connectivity.State
1129
1130 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1131 resetBackoff chan struct{}
1132
1133 channelzID int64 // channelz unique identification number.
1134 czData *channelzData
1135}
1136
1137// Note: this requires a lock on ac.mu.
1138func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1139 if ac.state == s {
1140 return
1141 }
1142 ac.state = s
1143 channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
1144 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1145}
1146
1147// adjustParams updates parameters used to create transports upon
1148// receiving a GoAway.
1149func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1150 switch r {
1151 case transport.GoAwayTooManyPings:
1152 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1153 ac.cc.mu.Lock()
1154 if v > ac.cc.mkp.Time {
1155 ac.cc.mkp.Time = v
1156 }
1157 ac.cc.mu.Unlock()
1158 }
1159}
1160
1161func (ac *addrConn) resetTransport() {
1162 ac.mu.Lock()
1163 if ac.state == connectivity.Shutdown {
1164 ac.mu.Unlock()
1165 return
1166 }
1167
1168 addrs := ac.addrs
1169 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1170 // This will be the duration that dial gets to finish.
1171 dialDuration := minConnectTimeout
1172 if ac.dopts.minConnectTimeout != nil {
1173 dialDuration = ac.dopts.minConnectTimeout()
1174 }
1175
1176 if dialDuration < backoffFor {
1177 // Give dial more time as we keep failing to connect.
1178 dialDuration = backoffFor
1179 }
1180 // We can potentially spend all the time trying the first address, and
1181 // if the server accepts the connection and then hangs, the following
1182 // addresses will never be tried.
1183 //
1184 // The spec doesn't mention what should be done for multiple addresses.
1185 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1186 connectDeadline := time.Now().Add(dialDuration)
1187
1188 ac.updateConnectivityState(connectivity.Connecting, nil)
1189 ac.mu.Unlock()
1190
1191 if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
1192 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1193 // After exhausting all addresses, the addrConn enters
1194 // TRANSIENT_FAILURE.
1195 ac.mu.Lock()
1196 if ac.state == connectivity.Shutdown {
1197 ac.mu.Unlock()
1198 return
1199 }
1200 ac.updateConnectivityState(connectivity.TransientFailure, err)
1201
1202 // Backoff.
1203 b := ac.resetBackoff
1204 ac.mu.Unlock()
1205
1206 timer := time.NewTimer(backoffFor)
1207 select {
1208 case <-timer.C:
1209 ac.mu.Lock()
1210 ac.backoffIdx++
1211 ac.mu.Unlock()
1212 case <-b:
1213 timer.Stop()
1214 case <-ac.ctx.Done():
1215 timer.Stop()
1216 return
1217 }
1218
1219 ac.mu.Lock()
1220 if ac.state != connectivity.Shutdown {
1221 ac.updateConnectivityState(connectivity.Idle, err)
1222 }
1223 ac.mu.Unlock()
1224 return
1225 }
1226 // Success; reset backoff.
1227 ac.mu.Lock()
1228 ac.backoffIdx = 0
1229 ac.mu.Unlock()
1230}
1231
1232// tryAllAddrs tries to creates a connection to the addresses, and stop when at
1233// the first successful one. It returns an error if no address was successfully
1234// connected, or updates ac appropriately with the new transport.
1235func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
1236 var firstConnErr error
1237 for _, addr := range addrs {
1238 ac.mu.Lock()
1239 if ac.state == connectivity.Shutdown {
1240 ac.mu.Unlock()
1241 return errConnClosing
1242 }
1243
1244 ac.cc.mu.RLock()
1245 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1246 ac.cc.mu.RUnlock()
1247
1248 copts := ac.dopts.copts
1249 if ac.scopts.CredsBundle != nil {
1250 copts.CredsBundle = ac.scopts.CredsBundle
1251 }
1252 ac.mu.Unlock()
1253
1254 channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1255
1256 err := ac.createTransport(addr, copts, connectDeadline)
1257 if err == nil {
1258 return nil
1259 }
1260 if firstConnErr == nil {
1261 firstConnErr = err
1262 }
1263 ac.cc.updateConnectionError(err)
1264 }
1265
1266 // Couldn't connect to any address.
1267 return firstConnErr
1268}
1269
1270// createTransport creates a connection to addr. It returns an error if the
1271// address was not successfully connected, or updates ac appropriately with the
1272// new transport.
1273func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1274 // TODO: Delete prefaceReceived and move the logic to wait for it into the
1275 // transport.
1276 prefaceReceived := grpcsync.NewEvent()
1277 connClosed := grpcsync.NewEvent()
1278
khenaidoo5cb0d402021-12-08 14:09:16 -05001279 addr.ServerName = ac.cc.getServerName(addr)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001280 hctx, hcancel := context.WithCancel(ac.ctx)
1281 hcStarted := false // protected by ac.mu
1282
1283 onClose := func() {
1284 ac.mu.Lock()
1285 defer ac.mu.Unlock()
1286 defer connClosed.Fire()
1287 if !hcStarted || hctx.Err() != nil {
1288 // We didn't start the health check or set the state to READY, so
1289 // no need to do anything else here.
1290 //
1291 // OR, we have already cancelled the health check context, meaning
1292 // we have already called onClose once for this transport. In this
1293 // case it would be dangerous to clear the transport and update the
1294 // state, since there may be a new transport in this addrConn.
1295 return
1296 }
1297 hcancel()
1298 ac.transport = nil
1299 // Refresh the name resolver
1300 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1301 if ac.state != connectivity.Shutdown {
1302 ac.updateConnectivityState(connectivity.Idle, nil)
1303 }
1304 }
1305
1306 onGoAway := func(r transport.GoAwayReason) {
1307 ac.mu.Lock()
1308 ac.adjustParams(r)
1309 ac.mu.Unlock()
1310 onClose()
1311 }
1312
1313 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1314 defer cancel()
1315 if channelz.IsOn() {
1316 copts.ChannelzParentID = ac.channelzID
1317 }
1318
1319 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
1320 if err != nil {
1321 // newTr is either nil, or closed.
1322 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
1323 return err
1324 }
1325
1326 select {
1327 case <-connectCtx.Done():
1328 // We didn't get the preface in time.
1329 // The error we pass to Close() is immaterial since there are no open
1330 // streams at this point, so no trailers with error details will be sent
1331 // out. We just need to pass a non-nil error.
1332 newTr.Close(transport.ErrConnClosing)
1333 if connectCtx.Err() == context.DeadlineExceeded {
1334 err := errors.New("failed to receive server preface within timeout")
1335 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
1336 return err
1337 }
1338 return nil
1339 case <-prefaceReceived.Done():
1340 // We got the preface - huzzah! things are good.
1341 ac.mu.Lock()
1342 defer ac.mu.Unlock()
1343 if connClosed.HasFired() {
1344 // onClose called first; go idle but do nothing else.
1345 if ac.state != connectivity.Shutdown {
1346 ac.updateConnectivityState(connectivity.Idle, nil)
1347 }
1348 return nil
1349 }
1350 if ac.state == connectivity.Shutdown {
1351 // This can happen if the subConn was removed while in `Connecting`
1352 // state. tearDown() would have set the state to `Shutdown`, but
1353 // would not have closed the transport since ac.transport would not
1354 // been set at that point.
1355 //
1356 // We run this in a goroutine because newTr.Close() calls onClose()
1357 // inline, which requires locking ac.mu.
1358 //
1359 // The error we pass to Close() is immaterial since there are no open
1360 // streams at this point, so no trailers with error details will be sent
1361 // out. We just need to pass a non-nil error.
1362 go newTr.Close(transport.ErrConnClosing)
1363 return nil
1364 }
1365 ac.curAddr = addr
1366 ac.transport = newTr
1367 hcStarted = true
1368 ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
1369 return nil
1370 case <-connClosed.Done():
1371 // The transport has already closed. If we received the preface, too,
1372 // this is not an error.
1373 select {
1374 case <-prefaceReceived.Done():
1375 return nil
1376 default:
1377 return errors.New("connection closed before server preface received")
1378 }
1379 }
1380}
1381
1382// startHealthCheck starts the health checking stream (RPC) to watch the health
1383// stats of this connection if health checking is requested and configured.
1384//
1385// LB channel health checking is enabled when all requirements below are met:
1386// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1387// 2. internal.HealthCheckFunc is set by importing the grpc/health package
1388// 3. a service config with non-empty healthCheckConfig field is provided
1389// 4. the load balancer requests it
1390//
1391// It sets addrConn to READY if the health checking stream is not started.
1392//
1393// Caller must hold ac.mu.
1394func (ac *addrConn) startHealthCheck(ctx context.Context) {
1395 var healthcheckManagingState bool
1396 defer func() {
1397 if !healthcheckManagingState {
1398 ac.updateConnectivityState(connectivity.Ready, nil)
1399 }
1400 }()
1401
1402 if ac.cc.dopts.disableHealthCheck {
1403 return
1404 }
1405 healthCheckConfig := ac.cc.healthCheckConfig()
1406 if healthCheckConfig == nil {
1407 return
1408 }
1409 if !ac.scopts.HealthCheckEnabled {
1410 return
1411 }
1412 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1413 if healthCheckFunc == nil {
1414 // The health package is not imported to set health check function.
1415 //
1416 // TODO: add a link to the health check doc in the error message.
1417 channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
1418 return
1419 }
1420
1421 healthcheckManagingState = true
1422
1423 // Set up the health check helper functions.
1424 currentTr := ac.transport
1425 newStream := func(method string) (interface{}, error) {
1426 ac.mu.Lock()
1427 if ac.transport != currentTr {
1428 ac.mu.Unlock()
1429 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1430 }
1431 ac.mu.Unlock()
1432 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1433 }
1434 setConnectivityState := func(s connectivity.State, lastErr error) {
1435 ac.mu.Lock()
1436 defer ac.mu.Unlock()
1437 if ac.transport != currentTr {
1438 return
1439 }
1440 ac.updateConnectivityState(s, lastErr)
1441 }
1442 // Start the health checking stream.
1443 go func() {
1444 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1445 if err != nil {
1446 if status.Code(err) == codes.Unimplemented {
1447 channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1448 } else {
1449 channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1450 }
1451 }
1452 }()
1453}
1454
1455func (ac *addrConn) resetConnectBackoff() {
1456 ac.mu.Lock()
1457 close(ac.resetBackoff)
1458 ac.backoffIdx = 0
1459 ac.resetBackoff = make(chan struct{})
1460 ac.mu.Unlock()
1461}
1462
1463// getReadyTransport returns the transport if ac's state is READY or nil if not.
1464func (ac *addrConn) getReadyTransport() transport.ClientTransport {
1465 ac.mu.Lock()
1466 defer ac.mu.Unlock()
1467 if ac.state == connectivity.Ready {
1468 return ac.transport
1469 }
1470 return nil
1471}
1472
1473// tearDown starts to tear down the addrConn.
1474//
1475// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
1476// will leak. In most cases, call cc.removeAddrConn() instead.
1477func (ac *addrConn) tearDown(err error) {
1478 ac.mu.Lock()
1479 if ac.state == connectivity.Shutdown {
1480 ac.mu.Unlock()
1481 return
1482 }
1483 curTr := ac.transport
1484 ac.transport = nil
1485 // We have to set the state to Shutdown before anything else to prevent races
1486 // between setting the state and logic that waits on context cancellation / etc.
1487 ac.updateConnectivityState(connectivity.Shutdown, nil)
1488 ac.cancel()
1489 ac.curAddr = resolver.Address{}
1490 if err == errConnDrain && curTr != nil {
1491 // GracefulClose(...) may be executed multiple times when
1492 // i) receiving multiple GoAway frames from the server; or
1493 // ii) there are concurrent name resolver/Balancer triggered
1494 // address removal and GoAway.
1495 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1496 ac.mu.Unlock()
1497 curTr.GracefulClose()
1498 ac.mu.Lock()
1499 }
1500 if channelz.IsOn() {
1501 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
1502 Desc: "Subchannel Deleted",
1503 Severity: channelz.CtInfo,
1504 Parent: &channelz.TraceEventDesc{
1505 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1506 Severity: channelz.CtInfo,
1507 },
1508 })
1509 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1510 // the entity being deleted, and thus prevent it from being deleted right away.
1511 channelz.RemoveEntry(ac.channelzID)
1512 }
1513 ac.mu.Unlock()
1514}
1515
1516func (ac *addrConn) getState() connectivity.State {
1517 ac.mu.Lock()
1518 defer ac.mu.Unlock()
1519 return ac.state
1520}
1521
1522func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1523 ac.mu.Lock()
1524 addr := ac.curAddr.Addr
1525 ac.mu.Unlock()
1526 return &channelz.ChannelInternalMetric{
1527 State: ac.getState(),
1528 Target: addr,
1529 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1530 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1531 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1532 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1533 }
1534}
1535
1536func (ac *addrConn) incrCallsStarted() {
1537 atomic.AddInt64(&ac.czData.callsStarted, 1)
1538 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1539}
1540
1541func (ac *addrConn) incrCallsSucceeded() {
1542 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1543}
1544
1545func (ac *addrConn) incrCallsFailed() {
1546 atomic.AddInt64(&ac.czData.callsFailed, 1)
1547}
1548
1549type retryThrottler struct {
1550 max float64
1551 thresh float64
1552 ratio float64
1553
1554 mu sync.Mutex
1555 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1556}
1557
1558// throttle subtracts a retry token from the pool and returns whether a retry
1559// should be throttled (disallowed) based upon the retry throttling policy in
1560// the service config.
1561func (rt *retryThrottler) throttle() bool {
1562 if rt == nil {
1563 return false
1564 }
1565 rt.mu.Lock()
1566 defer rt.mu.Unlock()
1567 rt.tokens--
1568 if rt.tokens < 0 {
1569 rt.tokens = 0
1570 }
1571 return rt.tokens <= rt.thresh
1572}
1573
1574func (rt *retryThrottler) successfulRPC() {
1575 if rt == nil {
1576 return
1577 }
1578 rt.mu.Lock()
1579 defer rt.mu.Unlock()
1580 rt.tokens += rt.ratio
1581 if rt.tokens > rt.max {
1582 rt.tokens = rt.max
1583 }
1584}
1585
1586type channelzChannel struct {
1587 cc *ClientConn
1588}
1589
1590func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1591 return c.cc.channelzMetric()
1592}
1593
1594// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1595// underlying connections within the specified timeout.
1596//
1597// Deprecated: This error is never returned by grpc and should not be
1598// referenced by users.
1599var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1600
1601func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1602 for _, rb := range cc.dopts.resolvers {
1603 if scheme == rb.Scheme() {
1604 return rb
1605 }
1606 }
1607 return resolver.Get(scheme)
1608}
1609
1610func (cc *ClientConn) updateConnectionError(err error) {
1611 cc.lceMu.Lock()
1612 cc.lastConnectionError = err
1613 cc.lceMu.Unlock()
1614}
1615
1616func (cc *ClientConn) connectionError() error {
1617 cc.lceMu.Lock()
1618 defer cc.lceMu.Unlock()
1619 return cc.lastConnectionError
1620}
khenaidoo5cb0d402021-12-08 14:09:16 -05001621
1622func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
1623 channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
1624
1625 var rb resolver.Builder
1626 parsedTarget, err := parseTarget(cc.target)
1627 if err != nil {
1628 channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
1629 } else {
1630 channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
1631 rb = cc.getResolver(parsedTarget.Scheme)
1632 if rb != nil {
1633 cc.parsedTarget = parsedTarget
1634 return rb, nil
1635 }
1636 }
1637
1638 // We are here because the user's dial target did not contain a scheme or
1639 // specified an unregistered scheme. We should fallback to the default
1640 // scheme, except when a custom dialer is specified in which case, we should
1641 // always use passthrough scheme.
1642 defScheme := resolver.GetDefaultScheme()
1643 channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
1644 canonicalTarget := defScheme + ":///" + cc.target
1645
1646 parsedTarget, err = parseTarget(canonicalTarget)
1647 if err != nil {
1648 channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
1649 return nil, err
1650 }
1651 channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
1652 rb = cc.getResolver(parsedTarget.Scheme)
1653 if rb == nil {
1654 return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
1655 }
1656 cc.parsedTarget = parsedTarget
1657 return rb, nil
1658}
1659
1660// parseTarget uses RFC 3986 semantics to parse the given target into a
1661// resolver.Target struct containing scheme, authority and endpoint. Query
1662// params are stripped from the endpoint.
1663func parseTarget(target string) (resolver.Target, error) {
1664 u, err := url.Parse(target)
1665 if err != nil {
1666 return resolver.Target{}, err
1667 }
1668 // For targets of the form "[scheme]://[authority]/endpoint, the endpoint
1669 // value returned from url.Parse() contains a leading "/". Although this is
1670 // in accordance with RFC 3986, we do not want to break existing resolver
1671 // implementations which expect the endpoint without the leading "/". So, we
1672 // end up stripping the leading "/" here. But this will result in an
1673 // incorrect parsing for something like "unix:///path/to/socket". Since we
1674 // own the "unix" resolver, we can workaround in the unix resolver by using
1675 // the `URL` field instead of the `Endpoint` field.
1676 endpoint := u.Path
1677 if endpoint == "" {
1678 endpoint = u.Opaque
1679 }
1680 endpoint = strings.TrimPrefix(endpoint, "/")
1681 return resolver.Target{
1682 Scheme: u.Scheme,
1683 Authority: u.Host,
1684 Endpoint: endpoint,
1685 URL: *u,
1686 }, nil
1687}
1688
1689// Determine channel authority. The order of precedence is as follows:
1690// - user specified authority override using `WithAuthority` dial option
1691// - creds' notion of server name for the authentication handshake
1692// - endpoint from dial target of the form "scheme://[authority]/endpoint"
1693func determineAuthority(endpoint, target string, dopts dialOptions) (string, error) {
1694 // Historically, we had two options for users to specify the serverName or
1695 // authority for a channel. One was through the transport credentials
1696 // (either in its constructor, or through the OverrideServerName() method).
1697 // The other option (for cases where WithInsecure() dial option was used)
1698 // was to use the WithAuthority() dial option.
1699 //
1700 // A few things have changed since:
1701 // - `insecure` package with an implementation of the `TransportCredentials`
1702 // interface for the insecure case
1703 // - WithAuthority() dial option support for secure credentials
1704 authorityFromCreds := ""
1705 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
1706 authorityFromCreds = creds.Info().ServerName
1707 }
1708 authorityFromDialOption := dopts.authority
1709 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
1710 return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
1711 }
1712
1713 switch {
1714 case authorityFromDialOption != "":
1715 return authorityFromDialOption, nil
1716 case authorityFromCreds != "":
1717 return authorityFromCreds, nil
1718 case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):
1719 // TODO: remove when the unix resolver implements optional interface to
1720 // return channel authority.
1721 return "localhost", nil
1722 case strings.HasPrefix(endpoint, ":"):
1723 return "localhost" + endpoint, nil
1724 default:
1725 // TODO: Define an optional interface on the resolver builder to return
1726 // the channel authority given the user's dial target. For resolvers
1727 // which don't implement this interface, we will use the endpoint from
1728 // "scheme://authority/endpoint" as the default authority.
1729 return endpoint, nil
1730 }
1731}