blob: 5a9e7d754fe2b77d4b0a5107aa6f5d78ded03523 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
khenaidoo5cb0d402021-12-08 14:09:16 -050026 "net/url"
khenaidoo5fc5cea2021-08-11 17:39:16 -040027 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/internal/backoff"
39 "google.golang.org/grpc/internal/channelz"
40 "google.golang.org/grpc/internal/grpcsync"
khenaidoo5fc5cea2021-08-11 17:39:16 -040041 iresolver "google.golang.org/grpc/internal/resolver"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/serviceconfig"
46 "google.golang.org/grpc/status"
47
48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
52)
53
54const (
55 // minimum time to give a connection to complete
56 minConnectTimeout = 20 * time.Second
57 // must match grpclbName in grpclb/grpclb.go
58 grpclbName = "grpclb"
59)
60
61var (
62 // ErrClientConnClosing indicates that the operation is illegal because
63 // the ClientConn is closing.
64 //
65 // Deprecated: this error should not be relied upon by users; use the status
66 // code of Canceled instead.
67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69 errConnDrain = errors.New("grpc: the connection is drained")
70 // errConnClosing indicates that the connection is closing.
71 errConnClosing = errors.New("grpc: the connection is closing")
72 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
73 // service config.
74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
75)
76
77// The following errors are returned from Dial and DialContext
78var (
79 // errNoTransportSecurity indicates that there is no transport security
80 // being set for ClientConn. Users should either set one or explicitly
81 // call WithInsecure DialOption to disable security.
82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
83 // errTransportCredsAndBundle indicates that creds bundle is used together
84 // with other individual Transport Credentials.
85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
86 // errTransportCredentialsMissing indicates that users want to transmit security
87 // information (e.g., OAuth2 token) which requires secure connection on an insecure
88 // connection.
89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
90 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
91 // and grpc.WithInsecure() are both called for a connection.
92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
93)
94
95const (
96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97 defaultClientMaxSendMessageSize = math.MaxInt32
98 // http2IOBufSize specifies the buffer size for sending frames.
99 defaultWriteBufSize = 32 * 1024
100 defaultReadBufSize = 32 * 1024
101)
102
103// Dial creates a client connection to the given target.
104func Dial(target string, opts ...DialOption) (*ClientConn, error) {
105 return DialContext(context.Background(), target, opts...)
106}
107
108type defaultConfigSelector struct {
109 sc *ServiceConfig
110}
111
112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
113 return &iresolver.RPCConfig{
114 Context: rpcInfo.Context,
115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
116 }, nil
117}
118
119// DialContext creates a client connection to the given target. By default, it's
120// a non-blocking dial (the function won't wait for connections to be
121// established, and connecting happens in the background). To make it a blocking
122// dial, use WithBlock() dial option.
123//
124// In the non-blocking case, the ctx does not act against the connection. It
125// only controls the setup steps.
126//
127// In the blocking case, ctx can be used to cancel or expire the pending
128// connection. Once this function returns, the cancellation and expiration of
129// ctx will be noop. Users should call ClientConn.Close to terminate all the
130// pending operations after this function returns.
131//
132// The target name syntax is defined in
133// https://github.com/grpc/grpc/blob/master/doc/naming.md.
134// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
135func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
136 cc := &ClientConn{
137 target: target,
138 csMgr: &connectivityStateManager{},
139 conns: make(map[*addrConn]struct{}),
140 dopts: defaultDialOptions(),
141 blockingpicker: newPickerWrapper(),
142 czData: new(channelzData),
143 firstResolveEvent: grpcsync.NewEvent(),
144 }
145 cc.retryThrottler.Store((*retryThrottler)(nil))
146 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
147 cc.ctx, cc.cancel = context.WithCancel(context.Background())
148
149 for _, opt := range opts {
150 opt.apply(&cc.dopts)
151 }
152
153 chainUnaryClientInterceptors(cc)
154 chainStreamClientInterceptors(cc)
155
156 defer func() {
157 if err != nil {
158 cc.Close()
159 }
160 }()
161
162 if channelz.IsOn() {
163 if cc.dopts.channelzParentID != 0 {
164 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
165 channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
166 Desc: "Channel Created",
167 Severity: channelz.CtInfo,
168 Parent: &channelz.TraceEventDesc{
169 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
170 Severity: channelz.CtInfo,
171 },
172 })
173 } else {
174 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
175 channelz.Info(logger, cc.channelzID, "Channel Created")
176 }
177 cc.csMgr.channelzID = cc.channelzID
178 }
179
180 if !cc.dopts.insecure {
181 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
182 return nil, errNoTransportSecurity
183 }
184 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
185 return nil, errTransportCredsAndBundle
186 }
187 } else {
188 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
189 return nil, errCredentialsConflict
190 }
191 for _, cd := range cc.dopts.copts.PerRPCCredentials {
192 if cd.RequireTransportSecurity() {
193 return nil, errTransportCredentialsMissing
194 }
195 }
196 }
197
198 if cc.dopts.defaultServiceConfigRawJSON != nil {
199 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
200 if scpr.Err != nil {
201 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
202 }
203 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
204 }
205 cc.mkp = cc.dopts.copts.KeepaliveParams
206
207 if cc.dopts.copts.UserAgent != "" {
208 cc.dopts.copts.UserAgent += " " + grpcUA
209 } else {
210 cc.dopts.copts.UserAgent = grpcUA
211 }
212
213 if cc.dopts.timeout > 0 {
214 var cancel context.CancelFunc
215 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
216 defer cancel()
217 }
218 defer func() {
219 select {
220 case <-ctx.Done():
221 switch {
222 case ctx.Err() == err:
223 conn = nil
224 case err == nil || !cc.dopts.returnLastError:
225 conn, err = nil, ctx.Err()
226 default:
227 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
228 }
229 default:
230 }
231 }()
232
233 scSet := false
234 if cc.dopts.scChan != nil {
235 // Try to get an initial service config.
236 select {
237 case sc, ok := <-cc.dopts.scChan:
238 if ok {
239 cc.sc = &sc
240 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
241 scSet = true
242 }
243 default:
244 }
245 }
246 if cc.dopts.bs == nil {
247 cc.dopts.bs = backoff.DefaultExponential
248 }
249
250 // Determine the resolver to use.
khenaidoo5cb0d402021-12-08 14:09:16 -0500251 resolverBuilder, err := cc.parseTargetAndFindResolver()
252 if err != nil {
253 return nil, err
khenaidoo5fc5cea2021-08-11 17:39:16 -0400254 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500255 cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)
256 if err != nil {
257 return nil, err
khenaidoo5fc5cea2021-08-11 17:39:16 -0400258 }
khenaidoo5cb0d402021-12-08 14:09:16 -0500259 channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400260
261 if cc.dopts.scChan != nil && !scSet {
262 // Blocking wait for the initial service config.
263 select {
264 case sc, ok := <-cc.dopts.scChan:
265 if ok {
266 cc.sc = &sc
267 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
268 }
269 case <-ctx.Done():
270 return nil, ctx.Err()
271 }
272 }
273 if cc.dopts.scChan != nil {
274 go cc.scWatcher()
275 }
276
277 var credsClone credentials.TransportCredentials
278 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
279 credsClone = creds.Clone()
280 }
281 cc.balancerBuildOpts = balancer.BuildOptions{
282 DialCreds: credsClone,
283 CredsBundle: cc.dopts.copts.CredsBundle,
284 Dialer: cc.dopts.copts.Dialer,
285 CustomUserAgent: cc.dopts.copts.UserAgent,
286 ChannelzParentID: cc.channelzID,
287 Target: cc.parsedTarget,
288 }
289
290 // Build the resolver.
291 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
292 if err != nil {
293 return nil, fmt.Errorf("failed to build resolver: %v", err)
294 }
295 cc.mu.Lock()
296 cc.resolverWrapper = rWrapper
297 cc.mu.Unlock()
298
299 // A blocking dial blocks until the clientConn is ready.
300 if cc.dopts.block {
301 for {
302 cc.Connect()
303 s := cc.GetState()
304 if s == connectivity.Ready {
305 break
306 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
307 if err = cc.connectionError(); err != nil {
308 terr, ok := err.(interface {
309 Temporary() bool
310 })
311 if ok && !terr.Temporary() {
312 return nil, err
313 }
314 }
315 }
316 if !cc.WaitForStateChange(ctx, s) {
317 // ctx got timeout or canceled.
318 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
319 return nil, err
320 }
321 return nil, ctx.Err()
322 }
323 }
324 }
325
326 return cc, nil
327}
328
329// chainUnaryClientInterceptors chains all unary client interceptors into one.
330func chainUnaryClientInterceptors(cc *ClientConn) {
331 interceptors := cc.dopts.chainUnaryInts
332 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
333 // be executed before any other chained interceptors.
334 if cc.dopts.unaryInt != nil {
335 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
336 }
337 var chainedInt UnaryClientInterceptor
338 if len(interceptors) == 0 {
339 chainedInt = nil
340 } else if len(interceptors) == 1 {
341 chainedInt = interceptors[0]
342 } else {
343 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
344 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
345 }
346 }
347 cc.dopts.unaryInt = chainedInt
348}
349
350// getChainUnaryInvoker recursively generate the chained unary invoker.
351func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
352 if curr == len(interceptors)-1 {
353 return finalInvoker
354 }
355 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
356 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
357 }
358}
359
360// chainStreamClientInterceptors chains all stream client interceptors into one.
361func chainStreamClientInterceptors(cc *ClientConn) {
362 interceptors := cc.dopts.chainStreamInts
363 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
364 // be executed before any other chained interceptors.
365 if cc.dopts.streamInt != nil {
366 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
367 }
368 var chainedInt StreamClientInterceptor
369 if len(interceptors) == 0 {
370 chainedInt = nil
371 } else if len(interceptors) == 1 {
372 chainedInt = interceptors[0]
373 } else {
374 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
375 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
376 }
377 }
378 cc.dopts.streamInt = chainedInt
379}
380
381// getChainStreamer recursively generate the chained client stream constructor.
382func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
383 if curr == len(interceptors)-1 {
384 return finalStreamer
385 }
386 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
387 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
388 }
389}
390
391// connectivityStateManager keeps the connectivity.State of ClientConn.
392// This struct will eventually be exported so the balancers can access it.
393type connectivityStateManager struct {
394 mu sync.Mutex
395 state connectivity.State
396 notifyChan chan struct{}
397 channelzID int64
398}
399
400// updateState updates the connectivity.State of ClientConn.
401// If there's a change it notifies goroutines waiting on state change to
402// happen.
403func (csm *connectivityStateManager) updateState(state connectivity.State) {
404 csm.mu.Lock()
405 defer csm.mu.Unlock()
406 if csm.state == connectivity.Shutdown {
407 return
408 }
409 if csm.state == state {
410 return
411 }
412 csm.state = state
413 channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
414 if csm.notifyChan != nil {
415 // There are other goroutines waiting on this channel.
416 close(csm.notifyChan)
417 csm.notifyChan = nil
418 }
419}
420
421func (csm *connectivityStateManager) getState() connectivity.State {
422 csm.mu.Lock()
423 defer csm.mu.Unlock()
424 return csm.state
425}
426
427func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
428 csm.mu.Lock()
429 defer csm.mu.Unlock()
430 if csm.notifyChan == nil {
431 csm.notifyChan = make(chan struct{})
432 }
433 return csm.notifyChan
434}
435
436// ClientConnInterface defines the functions clients need to perform unary and
437// streaming RPCs. It is implemented by *ClientConn, and is only intended to
438// be referenced by generated code.
439type ClientConnInterface interface {
440 // Invoke performs a unary RPC and returns after the response is received
441 // into reply.
442 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
443 // NewStream begins a streaming RPC.
444 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
445}
446
447// Assert *ClientConn implements ClientConnInterface.
448var _ ClientConnInterface = (*ClientConn)(nil)
449
450// ClientConn represents a virtual connection to a conceptual endpoint, to
451// perform RPCs.
452//
453// A ClientConn is free to have zero or more actual connections to the endpoint
454// based on configuration, load, etc. It is also free to determine which actual
455// endpoints to use and may change it every RPC, permitting client-side load
456// balancing.
457//
458// A ClientConn encapsulates a range of functionality including name
459// resolution, TCP connection establishment (with retries and backoff) and TLS
460// handshakes. It also handles errors on established connections by
461// re-resolving the name and reconnecting.
462type ClientConn struct {
463 ctx context.Context
464 cancel context.CancelFunc
465
466 target string
467 parsedTarget resolver.Target
468 authority string
469 dopts dialOptions
470 csMgr *connectivityStateManager
471
472 balancerBuildOpts balancer.BuildOptions
473 blockingpicker *pickerWrapper
474
475 safeConfigSelector iresolver.SafeConfigSelector
476
477 mu sync.RWMutex
478 resolverWrapper *ccResolverWrapper
479 sc *ServiceConfig
480 conns map[*addrConn]struct{}
481 // Keepalive parameter can be updated if a GoAway is received.
482 mkp keepalive.ClientParameters
483 curBalancerName string
484 balancerWrapper *ccBalancerWrapper
485 retryThrottler atomic.Value
486
487 firstResolveEvent *grpcsync.Event
488
489 channelzID int64 // channelz unique identification number
490 czData *channelzData
491
492 lceMu sync.Mutex // protects lastConnectionError
493 lastConnectionError error
494}
495
496// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
497// ctx expires. A true value is returned in former case and false in latter.
498//
499// Experimental
500//
501// Notice: This API is EXPERIMENTAL and may be changed or removed in a
502// later release.
503func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
504 ch := cc.csMgr.getNotifyChan()
505 if cc.csMgr.getState() != sourceState {
506 return true
507 }
508 select {
509 case <-ctx.Done():
510 return false
511 case <-ch:
512 return true
513 }
514}
515
516// GetState returns the connectivity.State of ClientConn.
517//
518// Experimental
519//
520// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
521// release.
522func (cc *ClientConn) GetState() connectivity.State {
523 return cc.csMgr.getState()
524}
525
526// Connect causes all subchannels in the ClientConn to attempt to connect if
527// the channel is idle. Does not wait for the connection attempts to begin
528// before returning.
529//
530// Experimental
531//
532// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
533// release.
534func (cc *ClientConn) Connect() {
535 cc.mu.Lock()
536 defer cc.mu.Unlock()
537 if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
538 return
539 }
540 for ac := range cc.conns {
541 go ac.connect()
542 }
543}
544
545func (cc *ClientConn) scWatcher() {
546 for {
547 select {
548 case sc, ok := <-cc.dopts.scChan:
549 if !ok {
550 return
551 }
552 cc.mu.Lock()
553 // TODO: load balance policy runtime change is ignored.
554 // We may revisit this decision in the future.
555 cc.sc = &sc
556 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
557 cc.mu.Unlock()
558 case <-cc.ctx.Done():
559 return
560 }
561 }
562}
563
564// waitForResolvedAddrs blocks until the resolver has provided addresses or the
565// context expires. Returns nil unless the context expires first; otherwise
566// returns a status error based on the context.
567func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
568 // This is on the RPC path, so we use a fast path to avoid the
569 // more-expensive "select" below after the resolver has returned once.
570 if cc.firstResolveEvent.HasFired() {
571 return nil
572 }
573 select {
574 case <-cc.firstResolveEvent.Done():
575 return nil
576 case <-ctx.Done():
577 return status.FromContextError(ctx.Err()).Err()
578 case <-cc.ctx.Done():
579 return ErrClientConnClosing
580 }
581}
582
583var emptyServiceConfig *ServiceConfig
584
585func init() {
586 cfg := parseServiceConfig("{}")
587 if cfg.Err != nil {
588 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
589 }
590 emptyServiceConfig = cfg.Config.(*ServiceConfig)
591}
592
593func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
594 if cc.sc != nil {
595 cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
596 return
597 }
598 if cc.dopts.defaultServiceConfig != nil {
599 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
600 } else {
601 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
602 }
603}
604
605func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
606 defer cc.firstResolveEvent.Fire()
607 cc.mu.Lock()
608 // Check if the ClientConn is already closed. Some fields (e.g.
609 // balancerWrapper) are set to nil when closing the ClientConn, and could
610 // cause nil pointer panic if we don't have this check.
611 if cc.conns == nil {
612 cc.mu.Unlock()
613 return nil
614 }
615
616 if err != nil {
617 // May need to apply the initial service config in case the resolver
618 // doesn't support service configs, or doesn't provide a service config
619 // with the new addresses.
620 cc.maybeApplyDefaultServiceConfig(nil)
621
622 if cc.balancerWrapper != nil {
623 cc.balancerWrapper.resolverError(err)
624 }
625
626 // No addresses are valid with err set; return early.
627 cc.mu.Unlock()
628 return balancer.ErrBadResolverState
629 }
630
631 var ret error
632 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
633 cc.maybeApplyDefaultServiceConfig(s.Addresses)
634 // TODO: do we need to apply a failing LB policy if there is no
635 // default, per the error handling design?
636 } else {
637 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
638 configSelector := iresolver.GetConfigSelector(s)
639 if configSelector != nil {
640 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
641 channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
642 }
643 } else {
644 configSelector = &defaultConfigSelector{sc}
645 }
646 cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
647 } else {
648 ret = balancer.ErrBadResolverState
649 if cc.balancerWrapper == nil {
650 var err error
651 if s.ServiceConfig.Err != nil {
652 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
653 } else {
654 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
655 }
656 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
657 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
658 cc.csMgr.updateState(connectivity.TransientFailure)
659 cc.mu.Unlock()
660 return ret
661 }
662 }
663 }
664
665 var balCfg serviceconfig.LoadBalancingConfig
666 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
667 balCfg = cc.sc.lbConfig.cfg
668 }
669
670 cbn := cc.curBalancerName
671 bw := cc.balancerWrapper
672 cc.mu.Unlock()
673 if cbn != grpclbName {
674 // Filter any grpclb addresses since we don't have the grpclb balancer.
675 for i := 0; i < len(s.Addresses); {
676 if s.Addresses[i].Type == resolver.GRPCLB {
677 copy(s.Addresses[i:], s.Addresses[i+1:])
678 s.Addresses = s.Addresses[:len(s.Addresses)-1]
679 continue
680 }
681 i++
682 }
683 }
684 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
685 if ret == nil {
686 ret = uccsErr // prefer ErrBadResolver state since any other error is
687 // currently meaningless to the caller.
688 }
689 return ret
690}
691
692// switchBalancer starts the switching from current balancer to the balancer
693// with the given name.
694//
695// It will NOT send the current address list to the new balancer. If needed,
696// caller of this function should send address list to the new balancer after
697// this function returns.
698//
699// Caller must hold cc.mu.
700func (cc *ClientConn) switchBalancer(name string) {
701 if strings.EqualFold(cc.curBalancerName, name) {
702 return
703 }
704
705 channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
706 if cc.dopts.balancerBuilder != nil {
707 channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
708 return
709 }
710 if cc.balancerWrapper != nil {
711 // Don't hold cc.mu while closing the balancers. The balancers may call
712 // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
713 // would cause a deadlock in that case.
714 cc.mu.Unlock()
715 cc.balancerWrapper.close()
716 cc.mu.Lock()
717 }
718
719 builder := balancer.Get(name)
720 if builder == nil {
721 channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
722 channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
723 builder = newPickfirstBuilder()
724 } else {
725 channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
726 }
727
728 cc.curBalancerName = builder.Name()
729 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
730}
731
732func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
733 cc.mu.Lock()
734 if cc.conns == nil {
735 cc.mu.Unlock()
736 return
737 }
738 // TODO(bar switching) send updates to all balancer wrappers when balancer
739 // gracefully switching is supported.
740 cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
741 cc.mu.Unlock()
742}
743
744// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
745//
746// Caller needs to make sure len(addrs) > 0.
747func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
748 ac := &addrConn{
749 state: connectivity.Idle,
750 cc: cc,
751 addrs: addrs,
752 scopts: opts,
753 dopts: cc.dopts,
754 czData: new(channelzData),
755 resetBackoff: make(chan struct{}),
756 }
757 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
758 // Track ac in cc. This needs to be done before any getTransport(...) is called.
759 cc.mu.Lock()
760 if cc.conns == nil {
761 cc.mu.Unlock()
762 return nil, ErrClientConnClosing
763 }
764 if channelz.IsOn() {
765 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
766 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
767 Desc: "Subchannel Created",
768 Severity: channelz.CtInfo,
769 Parent: &channelz.TraceEventDesc{
770 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
771 Severity: channelz.CtInfo,
772 },
773 })
774 }
775 cc.conns[ac] = struct{}{}
776 cc.mu.Unlock()
777 return ac, nil
778}
779
780// removeAddrConn removes the addrConn in the subConn from clientConn.
781// It also tears down the ac with the given error.
782func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
783 cc.mu.Lock()
784 if cc.conns == nil {
785 cc.mu.Unlock()
786 return
787 }
788 delete(cc.conns, ac)
789 cc.mu.Unlock()
790 ac.tearDown(err)
791}
792
793func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
794 return &channelz.ChannelInternalMetric{
795 State: cc.GetState(),
796 Target: cc.target,
797 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
798 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
799 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
800 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
801 }
802}
803
804// Target returns the target string of the ClientConn.
805//
806// Experimental
807//
808// Notice: This API is EXPERIMENTAL and may be changed or removed in a
809// later release.
810func (cc *ClientConn) Target() string {
811 return cc.target
812}
813
814func (cc *ClientConn) incrCallsStarted() {
815 atomic.AddInt64(&cc.czData.callsStarted, 1)
816 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
817}
818
819func (cc *ClientConn) incrCallsSucceeded() {
820 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
821}
822
823func (cc *ClientConn) incrCallsFailed() {
824 atomic.AddInt64(&cc.czData.callsFailed, 1)
825}
826
827// connect starts creating a transport.
828// It does nothing if the ac is not IDLE.
829// TODO(bar) Move this to the addrConn section.
830func (ac *addrConn) connect() error {
831 ac.mu.Lock()
832 if ac.state == connectivity.Shutdown {
833 ac.mu.Unlock()
834 return errConnClosing
835 }
836 if ac.state != connectivity.Idle {
837 ac.mu.Unlock()
838 return nil
839 }
840 // Update connectivity state within the lock to prevent subsequent or
841 // concurrent calls from resetting the transport more than once.
842 ac.updateConnectivityState(connectivity.Connecting, nil)
843 ac.mu.Unlock()
844
845 ac.resetTransport()
846 return nil
847}
848
849// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
850//
851// If ac is Connecting, it returns false. The caller should tear down the ac and
852// create a new one. Note that the backoff will be reset when this happens.
853//
854// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
855// addresses will be picked up by retry in the next iteration after backoff.
856//
857// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
858//
859// If ac is Ready, it checks whether current connected address of ac is in the
860// new addrs list.
861// - If true, it updates ac.addrs and returns true. The ac will keep using
862// the existing connection.
863// - If false, it does nothing and returns false.
864func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
865 ac.mu.Lock()
866 defer ac.mu.Unlock()
867 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
868 if ac.state == connectivity.Shutdown ||
869 ac.state == connectivity.TransientFailure ||
870 ac.state == connectivity.Idle {
871 ac.addrs = addrs
872 return true
873 }
874
875 if ac.state == connectivity.Connecting {
876 return false
877 }
878
879 // ac.state is Ready, try to find the connected address.
880 var curAddrFound bool
881 for _, a := range addrs {
khenaidoo5cb0d402021-12-08 14:09:16 -0500882 a.ServerName = ac.cc.getServerName(a)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400883 if reflect.DeepEqual(ac.curAddr, a) {
884 curAddrFound = true
885 break
886 }
887 }
888 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
889 if curAddrFound {
890 ac.addrs = addrs
891 }
892
893 return curAddrFound
894}
895
khenaidoo5cb0d402021-12-08 14:09:16 -0500896// getServerName determines the serverName to be used in the connection
897// handshake. The default value for the serverName is the authority on the
898// ClientConn, which either comes from the user's dial target or through an
899// authority override specified using the WithAuthority dial option. Name
900// resolvers can specify a per-address override for the serverName through the
901// resolver.Address.ServerName field which is used only if the WithAuthority
902// dial option was not used. The rationale is that per-address authority
903// overrides specified by the name resolver can represent a security risk, while
904// an override specified by the user is more dependable since they probably know
905// what they are doing.
906func (cc *ClientConn) getServerName(addr resolver.Address) string {
907 if cc.dopts.authority != "" {
908 return cc.dopts.authority
909 }
910 if addr.ServerName != "" {
911 return addr.ServerName
912 }
913 return cc.authority
914}
915
khenaidoo5fc5cea2021-08-11 17:39:16 -0400916func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
917 if sc == nil {
918 return MethodConfig{}
919 }
920 if m, ok := sc.Methods[method]; ok {
921 return m
922 }
923 i := strings.LastIndex(method, "/")
924 if m, ok := sc.Methods[method[:i+1]]; ok {
925 return m
926 }
927 return sc.Methods[""]
928}
929
930// GetMethodConfig gets the method config of the input method.
931// If there's an exact match for input method (i.e. /service/method), we return
932// the corresponding MethodConfig.
933// If there isn't an exact match for the input method, we look for the service's default
934// config under the service (i.e /service/) and then for the default for all services (empty string).
935//
936// If there is a default MethodConfig for the service, we return it.
937// Otherwise, we return an empty MethodConfig.
938func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
939 // TODO: Avoid the locking here.
940 cc.mu.RLock()
941 defer cc.mu.RUnlock()
942 return getMethodConfig(cc.sc, method)
943}
944
945func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
946 cc.mu.RLock()
947 defer cc.mu.RUnlock()
948 if cc.sc == nil {
949 return nil
950 }
951 return cc.sc.healthCheckConfig
952}
953
954func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
955 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
956 Ctx: ctx,
957 FullMethodName: method,
958 })
959 if err != nil {
960 return nil, nil, toRPCErr(err)
961 }
962 return t, done, nil
963}
964
965func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
966 if sc == nil {
967 // should never reach here.
968 return
969 }
970 cc.sc = sc
971 if configSelector != nil {
972 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
973 }
974
975 if cc.sc.retryThrottling != nil {
976 newThrottler := &retryThrottler{
977 tokens: cc.sc.retryThrottling.MaxTokens,
978 max: cc.sc.retryThrottling.MaxTokens,
979 thresh: cc.sc.retryThrottling.MaxTokens / 2,
980 ratio: cc.sc.retryThrottling.TokenRatio,
981 }
982 cc.retryThrottler.Store(newThrottler)
983 } else {
984 cc.retryThrottler.Store((*retryThrottler)(nil))
985 }
986
987 if cc.dopts.balancerBuilder == nil {
988 // Only look at balancer types and switch balancer if balancer dial
989 // option is not set.
990 var newBalancerName string
991 if cc.sc != nil && cc.sc.lbConfig != nil {
992 newBalancerName = cc.sc.lbConfig.name
993 } else {
994 var isGRPCLB bool
995 for _, a := range addrs {
996 if a.Type == resolver.GRPCLB {
997 isGRPCLB = true
998 break
999 }
1000 }
1001 if isGRPCLB {
1002 newBalancerName = grpclbName
1003 } else if cc.sc != nil && cc.sc.LB != nil {
1004 newBalancerName = *cc.sc.LB
1005 } else {
1006 newBalancerName = PickFirstBalancerName
1007 }
1008 }
1009 cc.switchBalancer(newBalancerName)
1010 } else if cc.balancerWrapper == nil {
1011 // Balancer dial option was set, and this is the first time handling
1012 // resolved addresses. Build a balancer with dopts.balancerBuilder.
1013 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
1014 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
1015 }
1016}
1017
1018func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
1019 cc.mu.RLock()
1020 r := cc.resolverWrapper
1021 cc.mu.RUnlock()
1022 if r == nil {
1023 return
1024 }
1025 go r.resolveNow(o)
1026}
1027
1028// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1029// them to attempt another connection immediately. It also resets the backoff
1030// times used for subsequent attempts regardless of the current state.
1031//
1032// In general, this function should not be used. Typical service or network
1033// outages result in a reasonable client reconnection strategy by default.
1034// However, if a previously unavailable network becomes available, this may be
1035// used to trigger an immediate reconnect.
1036//
1037// Experimental
1038//
1039// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1040// later release.
1041func (cc *ClientConn) ResetConnectBackoff() {
1042 cc.mu.Lock()
1043 conns := cc.conns
1044 cc.mu.Unlock()
1045 for ac := range conns {
1046 ac.resetConnectBackoff()
1047 }
1048}
1049
1050// Close tears down the ClientConn and all underlying connections.
1051func (cc *ClientConn) Close() error {
1052 defer cc.cancel()
1053
1054 cc.mu.Lock()
1055 if cc.conns == nil {
1056 cc.mu.Unlock()
1057 return ErrClientConnClosing
1058 }
1059 conns := cc.conns
1060 cc.conns = nil
1061 cc.csMgr.updateState(connectivity.Shutdown)
1062
1063 rWrapper := cc.resolverWrapper
1064 cc.resolverWrapper = nil
1065 bWrapper := cc.balancerWrapper
1066 cc.balancerWrapper = nil
1067 cc.mu.Unlock()
1068
1069 cc.blockingpicker.close()
1070
1071 if bWrapper != nil {
1072 bWrapper.close()
1073 }
1074 if rWrapper != nil {
1075 rWrapper.close()
1076 }
1077
1078 for ac := range conns {
1079 ac.tearDown(ErrClientConnClosing)
1080 }
1081 if channelz.IsOn() {
1082 ted := &channelz.TraceEventDesc{
1083 Desc: "Channel Deleted",
1084 Severity: channelz.CtInfo,
1085 }
1086 if cc.dopts.channelzParentID != 0 {
1087 ted.Parent = &channelz.TraceEventDesc{
1088 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1089 Severity: channelz.CtInfo,
1090 }
1091 }
1092 channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
1093 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1094 // the entity being deleted, and thus prevent it from being deleted right away.
1095 channelz.RemoveEntry(cc.channelzID)
1096 }
1097 return nil
1098}
1099
1100// addrConn is a network connection to a given address.
1101type addrConn struct {
1102 ctx context.Context
1103 cancel context.CancelFunc
1104
1105 cc *ClientConn
1106 dopts dialOptions
1107 acbw balancer.SubConn
1108 scopts balancer.NewSubConnOptions
1109
1110 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1111 // health checking may require server to report healthy to set ac to READY), and is reset
1112 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1113 // is received, transport is closed, ac has been torn down).
1114 transport transport.ClientTransport // The current transport.
1115
1116 mu sync.Mutex
1117 curAddr resolver.Address // The current address.
1118 addrs []resolver.Address // All addresses that the resolver resolved to.
1119
1120 // Use updateConnectivityState for updating addrConn's connectivity state.
1121 state connectivity.State
1122
1123 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1124 resetBackoff chan struct{}
1125
1126 channelzID int64 // channelz unique identification number.
1127 czData *channelzData
1128}
1129
1130// Note: this requires a lock on ac.mu.
1131func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1132 if ac.state == s {
1133 return
1134 }
1135 ac.state = s
1136 channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
1137 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1138}
1139
1140// adjustParams updates parameters used to create transports upon
1141// receiving a GoAway.
1142func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1143 switch r {
1144 case transport.GoAwayTooManyPings:
1145 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1146 ac.cc.mu.Lock()
1147 if v > ac.cc.mkp.Time {
1148 ac.cc.mkp.Time = v
1149 }
1150 ac.cc.mu.Unlock()
1151 }
1152}
1153
1154func (ac *addrConn) resetTransport() {
1155 ac.mu.Lock()
1156 if ac.state == connectivity.Shutdown {
1157 ac.mu.Unlock()
1158 return
1159 }
1160
1161 addrs := ac.addrs
1162 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1163 // This will be the duration that dial gets to finish.
1164 dialDuration := minConnectTimeout
1165 if ac.dopts.minConnectTimeout != nil {
1166 dialDuration = ac.dopts.minConnectTimeout()
1167 }
1168
1169 if dialDuration < backoffFor {
1170 // Give dial more time as we keep failing to connect.
1171 dialDuration = backoffFor
1172 }
1173 // We can potentially spend all the time trying the first address, and
1174 // if the server accepts the connection and then hangs, the following
1175 // addresses will never be tried.
1176 //
1177 // The spec doesn't mention what should be done for multiple addresses.
1178 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1179 connectDeadline := time.Now().Add(dialDuration)
1180
1181 ac.updateConnectivityState(connectivity.Connecting, nil)
1182 ac.mu.Unlock()
1183
1184 if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
1185 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1186 // After exhausting all addresses, the addrConn enters
1187 // TRANSIENT_FAILURE.
1188 ac.mu.Lock()
1189 if ac.state == connectivity.Shutdown {
1190 ac.mu.Unlock()
1191 return
1192 }
1193 ac.updateConnectivityState(connectivity.TransientFailure, err)
1194
1195 // Backoff.
1196 b := ac.resetBackoff
1197 ac.mu.Unlock()
1198
1199 timer := time.NewTimer(backoffFor)
1200 select {
1201 case <-timer.C:
1202 ac.mu.Lock()
1203 ac.backoffIdx++
1204 ac.mu.Unlock()
1205 case <-b:
1206 timer.Stop()
1207 case <-ac.ctx.Done():
1208 timer.Stop()
1209 return
1210 }
1211
1212 ac.mu.Lock()
1213 if ac.state != connectivity.Shutdown {
1214 ac.updateConnectivityState(connectivity.Idle, err)
1215 }
1216 ac.mu.Unlock()
1217 return
1218 }
1219 // Success; reset backoff.
1220 ac.mu.Lock()
1221 ac.backoffIdx = 0
1222 ac.mu.Unlock()
1223}
1224
1225// tryAllAddrs tries to creates a connection to the addresses, and stop when at
1226// the first successful one. It returns an error if no address was successfully
1227// connected, or updates ac appropriately with the new transport.
1228func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
1229 var firstConnErr error
1230 for _, addr := range addrs {
1231 ac.mu.Lock()
1232 if ac.state == connectivity.Shutdown {
1233 ac.mu.Unlock()
1234 return errConnClosing
1235 }
1236
1237 ac.cc.mu.RLock()
1238 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1239 ac.cc.mu.RUnlock()
1240
1241 copts := ac.dopts.copts
1242 if ac.scopts.CredsBundle != nil {
1243 copts.CredsBundle = ac.scopts.CredsBundle
1244 }
1245 ac.mu.Unlock()
1246
1247 channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1248
1249 err := ac.createTransport(addr, copts, connectDeadline)
1250 if err == nil {
1251 return nil
1252 }
1253 if firstConnErr == nil {
1254 firstConnErr = err
1255 }
1256 ac.cc.updateConnectionError(err)
1257 }
1258
1259 // Couldn't connect to any address.
1260 return firstConnErr
1261}
1262
1263// createTransport creates a connection to addr. It returns an error if the
1264// address was not successfully connected, or updates ac appropriately with the
1265// new transport.
1266func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1267 // TODO: Delete prefaceReceived and move the logic to wait for it into the
1268 // transport.
1269 prefaceReceived := grpcsync.NewEvent()
1270 connClosed := grpcsync.NewEvent()
1271
khenaidoo5cb0d402021-12-08 14:09:16 -05001272 addr.ServerName = ac.cc.getServerName(addr)
khenaidoo5fc5cea2021-08-11 17:39:16 -04001273 hctx, hcancel := context.WithCancel(ac.ctx)
1274 hcStarted := false // protected by ac.mu
1275
1276 onClose := func() {
1277 ac.mu.Lock()
1278 defer ac.mu.Unlock()
1279 defer connClosed.Fire()
1280 if !hcStarted || hctx.Err() != nil {
1281 // We didn't start the health check or set the state to READY, so
1282 // no need to do anything else here.
1283 //
1284 // OR, we have already cancelled the health check context, meaning
1285 // we have already called onClose once for this transport. In this
1286 // case it would be dangerous to clear the transport and update the
1287 // state, since there may be a new transport in this addrConn.
1288 return
1289 }
1290 hcancel()
1291 ac.transport = nil
1292 // Refresh the name resolver
1293 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1294 if ac.state != connectivity.Shutdown {
1295 ac.updateConnectivityState(connectivity.Idle, nil)
1296 }
1297 }
1298
1299 onGoAway := func(r transport.GoAwayReason) {
1300 ac.mu.Lock()
1301 ac.adjustParams(r)
1302 ac.mu.Unlock()
1303 onClose()
1304 }
1305
1306 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1307 defer cancel()
1308 if channelz.IsOn() {
1309 copts.ChannelzParentID = ac.channelzID
1310 }
1311
1312 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
1313 if err != nil {
1314 // newTr is either nil, or closed.
1315 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
1316 return err
1317 }
1318
1319 select {
1320 case <-connectCtx.Done():
1321 // We didn't get the preface in time.
1322 // The error we pass to Close() is immaterial since there are no open
1323 // streams at this point, so no trailers with error details will be sent
1324 // out. We just need to pass a non-nil error.
1325 newTr.Close(transport.ErrConnClosing)
1326 if connectCtx.Err() == context.DeadlineExceeded {
1327 err := errors.New("failed to receive server preface within timeout")
1328 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
1329 return err
1330 }
1331 return nil
1332 case <-prefaceReceived.Done():
1333 // We got the preface - huzzah! things are good.
1334 ac.mu.Lock()
1335 defer ac.mu.Unlock()
1336 if connClosed.HasFired() {
1337 // onClose called first; go idle but do nothing else.
1338 if ac.state != connectivity.Shutdown {
1339 ac.updateConnectivityState(connectivity.Idle, nil)
1340 }
1341 return nil
1342 }
1343 if ac.state == connectivity.Shutdown {
1344 // This can happen if the subConn was removed while in `Connecting`
1345 // state. tearDown() would have set the state to `Shutdown`, but
1346 // would not have closed the transport since ac.transport would not
1347 // been set at that point.
1348 //
1349 // We run this in a goroutine because newTr.Close() calls onClose()
1350 // inline, which requires locking ac.mu.
1351 //
1352 // The error we pass to Close() is immaterial since there are no open
1353 // streams at this point, so no trailers with error details will be sent
1354 // out. We just need to pass a non-nil error.
1355 go newTr.Close(transport.ErrConnClosing)
1356 return nil
1357 }
1358 ac.curAddr = addr
1359 ac.transport = newTr
1360 hcStarted = true
1361 ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
1362 return nil
1363 case <-connClosed.Done():
1364 // The transport has already closed. If we received the preface, too,
1365 // this is not an error.
1366 select {
1367 case <-prefaceReceived.Done():
1368 return nil
1369 default:
1370 return errors.New("connection closed before server preface received")
1371 }
1372 }
1373}
1374
1375// startHealthCheck starts the health checking stream (RPC) to watch the health
1376// stats of this connection if health checking is requested and configured.
1377//
1378// LB channel health checking is enabled when all requirements below are met:
1379// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1380// 2. internal.HealthCheckFunc is set by importing the grpc/health package
1381// 3. a service config with non-empty healthCheckConfig field is provided
1382// 4. the load balancer requests it
1383//
1384// It sets addrConn to READY if the health checking stream is not started.
1385//
1386// Caller must hold ac.mu.
1387func (ac *addrConn) startHealthCheck(ctx context.Context) {
1388 var healthcheckManagingState bool
1389 defer func() {
1390 if !healthcheckManagingState {
1391 ac.updateConnectivityState(connectivity.Ready, nil)
1392 }
1393 }()
1394
1395 if ac.cc.dopts.disableHealthCheck {
1396 return
1397 }
1398 healthCheckConfig := ac.cc.healthCheckConfig()
1399 if healthCheckConfig == nil {
1400 return
1401 }
1402 if !ac.scopts.HealthCheckEnabled {
1403 return
1404 }
1405 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1406 if healthCheckFunc == nil {
1407 // The health package is not imported to set health check function.
1408 //
1409 // TODO: add a link to the health check doc in the error message.
1410 channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
1411 return
1412 }
1413
1414 healthcheckManagingState = true
1415
1416 // Set up the health check helper functions.
1417 currentTr := ac.transport
1418 newStream := func(method string) (interface{}, error) {
1419 ac.mu.Lock()
1420 if ac.transport != currentTr {
1421 ac.mu.Unlock()
1422 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1423 }
1424 ac.mu.Unlock()
1425 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1426 }
1427 setConnectivityState := func(s connectivity.State, lastErr error) {
1428 ac.mu.Lock()
1429 defer ac.mu.Unlock()
1430 if ac.transport != currentTr {
1431 return
1432 }
1433 ac.updateConnectivityState(s, lastErr)
1434 }
1435 // Start the health checking stream.
1436 go func() {
1437 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1438 if err != nil {
1439 if status.Code(err) == codes.Unimplemented {
1440 channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1441 } else {
1442 channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1443 }
1444 }
1445 }()
1446}
1447
1448func (ac *addrConn) resetConnectBackoff() {
1449 ac.mu.Lock()
1450 close(ac.resetBackoff)
1451 ac.backoffIdx = 0
1452 ac.resetBackoff = make(chan struct{})
1453 ac.mu.Unlock()
1454}
1455
1456// getReadyTransport returns the transport if ac's state is READY or nil if not.
1457func (ac *addrConn) getReadyTransport() transport.ClientTransport {
1458 ac.mu.Lock()
1459 defer ac.mu.Unlock()
1460 if ac.state == connectivity.Ready {
1461 return ac.transport
1462 }
1463 return nil
1464}
1465
1466// tearDown starts to tear down the addrConn.
1467//
1468// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
1469// will leak. In most cases, call cc.removeAddrConn() instead.
1470func (ac *addrConn) tearDown(err error) {
1471 ac.mu.Lock()
1472 if ac.state == connectivity.Shutdown {
1473 ac.mu.Unlock()
1474 return
1475 }
1476 curTr := ac.transport
1477 ac.transport = nil
1478 // We have to set the state to Shutdown before anything else to prevent races
1479 // between setting the state and logic that waits on context cancellation / etc.
1480 ac.updateConnectivityState(connectivity.Shutdown, nil)
1481 ac.cancel()
1482 ac.curAddr = resolver.Address{}
1483 if err == errConnDrain && curTr != nil {
1484 // GracefulClose(...) may be executed multiple times when
1485 // i) receiving multiple GoAway frames from the server; or
1486 // ii) there are concurrent name resolver/Balancer triggered
1487 // address removal and GoAway.
1488 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1489 ac.mu.Unlock()
1490 curTr.GracefulClose()
1491 ac.mu.Lock()
1492 }
1493 if channelz.IsOn() {
1494 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
1495 Desc: "Subchannel Deleted",
1496 Severity: channelz.CtInfo,
1497 Parent: &channelz.TraceEventDesc{
1498 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1499 Severity: channelz.CtInfo,
1500 },
1501 })
1502 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1503 // the entity being deleted, and thus prevent it from being deleted right away.
1504 channelz.RemoveEntry(ac.channelzID)
1505 }
1506 ac.mu.Unlock()
1507}
1508
1509func (ac *addrConn) getState() connectivity.State {
1510 ac.mu.Lock()
1511 defer ac.mu.Unlock()
1512 return ac.state
1513}
1514
1515func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1516 ac.mu.Lock()
1517 addr := ac.curAddr.Addr
1518 ac.mu.Unlock()
1519 return &channelz.ChannelInternalMetric{
1520 State: ac.getState(),
1521 Target: addr,
1522 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1523 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1524 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1525 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1526 }
1527}
1528
1529func (ac *addrConn) incrCallsStarted() {
1530 atomic.AddInt64(&ac.czData.callsStarted, 1)
1531 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1532}
1533
1534func (ac *addrConn) incrCallsSucceeded() {
1535 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1536}
1537
1538func (ac *addrConn) incrCallsFailed() {
1539 atomic.AddInt64(&ac.czData.callsFailed, 1)
1540}
1541
1542type retryThrottler struct {
1543 max float64
1544 thresh float64
1545 ratio float64
1546
1547 mu sync.Mutex
1548 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1549}
1550
1551// throttle subtracts a retry token from the pool and returns whether a retry
1552// should be throttled (disallowed) based upon the retry throttling policy in
1553// the service config.
1554func (rt *retryThrottler) throttle() bool {
1555 if rt == nil {
1556 return false
1557 }
1558 rt.mu.Lock()
1559 defer rt.mu.Unlock()
1560 rt.tokens--
1561 if rt.tokens < 0 {
1562 rt.tokens = 0
1563 }
1564 return rt.tokens <= rt.thresh
1565}
1566
1567func (rt *retryThrottler) successfulRPC() {
1568 if rt == nil {
1569 return
1570 }
1571 rt.mu.Lock()
1572 defer rt.mu.Unlock()
1573 rt.tokens += rt.ratio
1574 if rt.tokens > rt.max {
1575 rt.tokens = rt.max
1576 }
1577}
1578
1579type channelzChannel struct {
1580 cc *ClientConn
1581}
1582
1583func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1584 return c.cc.channelzMetric()
1585}
1586
1587// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1588// underlying connections within the specified timeout.
1589//
1590// Deprecated: This error is never returned by grpc and should not be
1591// referenced by users.
1592var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1593
1594func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1595 for _, rb := range cc.dopts.resolvers {
1596 if scheme == rb.Scheme() {
1597 return rb
1598 }
1599 }
1600 return resolver.Get(scheme)
1601}
1602
1603func (cc *ClientConn) updateConnectionError(err error) {
1604 cc.lceMu.Lock()
1605 cc.lastConnectionError = err
1606 cc.lceMu.Unlock()
1607}
1608
1609func (cc *ClientConn) connectionError() error {
1610 cc.lceMu.Lock()
1611 defer cc.lceMu.Unlock()
1612 return cc.lastConnectionError
1613}
khenaidoo5cb0d402021-12-08 14:09:16 -05001614
1615func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
1616 channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
1617
1618 var rb resolver.Builder
1619 parsedTarget, err := parseTarget(cc.target)
1620 if err != nil {
1621 channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
1622 } else {
1623 channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
1624 rb = cc.getResolver(parsedTarget.Scheme)
1625 if rb != nil {
1626 cc.parsedTarget = parsedTarget
1627 return rb, nil
1628 }
1629 }
1630
1631 // We are here because the user's dial target did not contain a scheme or
1632 // specified an unregistered scheme. We should fallback to the default
1633 // scheme, except when a custom dialer is specified in which case, we should
1634 // always use passthrough scheme.
1635 defScheme := resolver.GetDefaultScheme()
1636 channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
1637 canonicalTarget := defScheme + ":///" + cc.target
1638
1639 parsedTarget, err = parseTarget(canonicalTarget)
1640 if err != nil {
1641 channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
1642 return nil, err
1643 }
1644 channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
1645 rb = cc.getResolver(parsedTarget.Scheme)
1646 if rb == nil {
1647 return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
1648 }
1649 cc.parsedTarget = parsedTarget
1650 return rb, nil
1651}
1652
1653// parseTarget uses RFC 3986 semantics to parse the given target into a
1654// resolver.Target struct containing scheme, authority and endpoint. Query
1655// params are stripped from the endpoint.
1656func parseTarget(target string) (resolver.Target, error) {
1657 u, err := url.Parse(target)
1658 if err != nil {
1659 return resolver.Target{}, err
1660 }
1661 // For targets of the form "[scheme]://[authority]/endpoint, the endpoint
1662 // value returned from url.Parse() contains a leading "/". Although this is
1663 // in accordance with RFC 3986, we do not want to break existing resolver
1664 // implementations which expect the endpoint without the leading "/". So, we
1665 // end up stripping the leading "/" here. But this will result in an
1666 // incorrect parsing for something like "unix:///path/to/socket". Since we
1667 // own the "unix" resolver, we can workaround in the unix resolver by using
1668 // the `URL` field instead of the `Endpoint` field.
1669 endpoint := u.Path
1670 if endpoint == "" {
1671 endpoint = u.Opaque
1672 }
1673 endpoint = strings.TrimPrefix(endpoint, "/")
1674 return resolver.Target{
1675 Scheme: u.Scheme,
1676 Authority: u.Host,
1677 Endpoint: endpoint,
1678 URL: *u,
1679 }, nil
1680}
1681
1682// Determine channel authority. The order of precedence is as follows:
1683// - user specified authority override using `WithAuthority` dial option
1684// - creds' notion of server name for the authentication handshake
1685// - endpoint from dial target of the form "scheme://[authority]/endpoint"
1686func determineAuthority(endpoint, target string, dopts dialOptions) (string, error) {
1687 // Historically, we had two options for users to specify the serverName or
1688 // authority for a channel. One was through the transport credentials
1689 // (either in its constructor, or through the OverrideServerName() method).
1690 // The other option (for cases where WithInsecure() dial option was used)
1691 // was to use the WithAuthority() dial option.
1692 //
1693 // A few things have changed since:
1694 // - `insecure` package with an implementation of the `TransportCredentials`
1695 // interface for the insecure case
1696 // - WithAuthority() dial option support for secure credentials
1697 authorityFromCreds := ""
1698 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
1699 authorityFromCreds = creds.Info().ServerName
1700 }
1701 authorityFromDialOption := dopts.authority
1702 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
1703 return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
1704 }
1705
1706 switch {
1707 case authorityFromDialOption != "":
1708 return authorityFromDialOption, nil
1709 case authorityFromCreds != "":
1710 return authorityFromCreds, nil
1711 case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):
1712 // TODO: remove when the unix resolver implements optional interface to
1713 // return channel authority.
1714 return "localhost", nil
1715 case strings.HasPrefix(endpoint, ":"):
1716 return "localhost" + endpoint, nil
1717 default:
1718 // TODO: Define an optional interface on the resolver builder to return
1719 // the channel authority given the user's dial target. For resolvers
1720 // which don't implement this interface, we will use the endpoint from
1721 // "scheme://authority/endpoint" as the default authority.
1722 return endpoint, nil
1723 }
1724}