blob: f58740b25078a1a68892acbfa49321745bfec548 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/grpcsync"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/serviceconfig"
46 "google.golang.org/grpc/status"
47
48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54 // minimum time to give a connection to complete
55 minConnectTimeout = 20 * time.Second
56 // must match grpclbName in grpclb/grpclb.go
57 grpclbName = "grpclb"
58)
59
60var (
61 // ErrClientConnClosing indicates that the operation is illegal because
62 // the ClientConn is closing.
63 //
64 // Deprecated: this error should not be relied upon by users; use the status
65 // code of Canceled instead.
66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68 errConnDrain = errors.New("grpc: the connection is drained")
69 // errConnClosing indicates that the connection is closing.
70 errConnClosing = errors.New("grpc: the connection is closing")
71 // errBalancerClosed indicates that the balancer is closed.
72 errBalancerClosed = errors.New("grpc: balancer is closed")
73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74 // service config.
75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80 // errNoTransportSecurity indicates that there is no transport security
81 // being set for ClientConn. Users should either set one or explicitly
82 // call WithInsecure DialOption to disable security.
83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84 // errTransportCredsAndBundle indicates that creds bundle is used together
85 // with other individual Transport Credentials.
86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87 // errTransportCredentialsMissing indicates that users want to transmit security
88 // information (e.g., OAuth2 token) which requires secure connection on an insecure
89 // connection.
90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
92 // and grpc.WithInsecure() are both called for a connection.
93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98 defaultClientMaxSendMessageSize = math.MaxInt32
99 // http2IOBufSize specifies the buffer size for sending frames.
100 defaultWriteBufSize = 32 * 1024
101 defaultReadBufSize = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106 return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126 cc := &ClientConn{
127 target: target,
128 csMgr: &connectivityStateManager{},
129 conns: make(map[*addrConn]struct{}),
130 dopts: defaultDialOptions(),
131 blockingpicker: newPickerWrapper(),
132 czData: new(channelzData),
133 firstResolveEvent: grpcsync.NewEvent(),
134 }
135 cc.retryThrottler.Store((*retryThrottler)(nil))
136 cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138 for _, opt := range opts {
139 opt.apply(&cc.dopts)
140 }
141
142 chainUnaryClientInterceptors(cc)
143 chainStreamClientInterceptors(cc)
144
145 defer func() {
146 if err != nil {
147 cc.Close()
148 }
149 }()
150
151 if channelz.IsOn() {
152 if cc.dopts.channelzParentID != 0 {
153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
155 Desc: "Channel Created",
156 Severity: channelz.CtINFO,
157 Parent: &channelz.TraceEventDesc{
158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159 Severity: channelz.CtINFO,
160 },
161 })
162 } else {
163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
165 Desc: "Channel Created",
166 Severity: channelz.CtINFO,
167 })
168 }
169 cc.csMgr.channelzID = cc.channelzID
170 }
171
172 if !cc.dopts.insecure {
173 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
174 return nil, errNoTransportSecurity
175 }
176 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
177 return nil, errTransportCredsAndBundle
178 }
179 } else {
180 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
181 return nil, errCredentialsConflict
182 }
183 for _, cd := range cc.dopts.copts.PerRPCCredentials {
184 if cd.RequireTransportSecurity() {
185 return nil, errTransportCredentialsMissing
186 }
187 }
188 }
189
190 if cc.dopts.defaultServiceConfigRawJSON != nil {
191 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
192 if scpr.Err != nil {
193 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
194 }
195 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
196 }
197 cc.mkp = cc.dopts.copts.KeepaliveParams
198
199 if cc.dopts.copts.Dialer == nil {
200 cc.dopts.copts.Dialer = newProxyDialer(
201 func(ctx context.Context, addr string) (net.Conn, error) {
202 network, addr := parseDialTarget(addr)
203 return (&net.Dialer{}).DialContext(ctx, network, addr)
204 },
205 )
206 }
207
208 if cc.dopts.copts.UserAgent != "" {
209 cc.dopts.copts.UserAgent += " " + grpcUA
210 } else {
211 cc.dopts.copts.UserAgent = grpcUA
212 }
213
214 if cc.dopts.timeout > 0 {
215 var cancel context.CancelFunc
216 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
217 defer cancel()
218 }
219 defer func() {
220 select {
221 case <-ctx.Done():
222 conn, err = nil, ctx.Err()
223 default:
224 }
225 }()
226
227 scSet := false
228 if cc.dopts.scChan != nil {
229 // Try to get an initial service config.
230 select {
231 case sc, ok := <-cc.dopts.scChan:
232 if ok {
233 cc.sc = &sc
234 scSet = true
235 }
236 default:
237 }
238 }
239 if cc.dopts.bs == nil {
240 cc.dopts.bs = backoff.DefaultExponential
241 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000242
243 // Determine the resolver to use.
244 cc.parsedTarget = parseTarget(cc.target)
245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
247 if resolverBuilder == nil {
248 // If resolver builder is still nil, the parsed target's scheme is
249 // not registered. Fallback to default resolver and set Endpoint to
250 // the original target.
251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252 cc.parsedTarget = resolver.Target{
253 Scheme: resolver.GetDefaultScheme(),
254 Endpoint: target,
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000255 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000256 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
257 if resolverBuilder == nil {
258 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
259 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000260 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000261
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000262 creds := cc.dopts.copts.TransportCredentials
263 if creds != nil && creds.Info().ServerName != "" {
264 cc.authority = creds.Info().ServerName
265 } else if cc.dopts.insecure && cc.dopts.authority != "" {
266 cc.authority = cc.dopts.authority
267 } else {
268 // Use endpoint from "scheme://authority/endpoint" as the default
269 // authority for ClientConn.
270 cc.authority = cc.parsedTarget.Endpoint
271 }
272
273 if cc.dopts.scChan != nil && !scSet {
274 // Blocking wait for the initial service config.
275 select {
276 case sc, ok := <-cc.dopts.scChan:
277 if ok {
278 cc.sc = &sc
279 }
280 case <-ctx.Done():
281 return nil, ctx.Err()
282 }
283 }
284 if cc.dopts.scChan != nil {
285 go cc.scWatcher()
286 }
287
288 var credsClone credentials.TransportCredentials
289 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
290 credsClone = creds.Clone()
291 }
292 cc.balancerBuildOpts = balancer.BuildOptions{
293 DialCreds: credsClone,
294 CredsBundle: cc.dopts.copts.CredsBundle,
295 Dialer: cc.dopts.copts.Dialer,
296 ChannelzParentID: cc.channelzID,
297 Target: cc.parsedTarget,
298 }
299
300 // Build the resolver.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000301 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000302 if err != nil {
303 return nil, fmt.Errorf("failed to build resolver: %v", err)
304 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000305 cc.mu.Lock()
306 cc.resolverWrapper = rWrapper
307 cc.mu.Unlock()
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000308
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000309 // A blocking dial blocks until the clientConn is ready.
310 if cc.dopts.block {
311 for {
312 s := cc.GetState()
313 if s == connectivity.Ready {
314 break
315 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
316 if err = cc.blockingpicker.connectionError(); err != nil {
317 terr, ok := err.(interface {
318 Temporary() bool
319 })
320 if ok && !terr.Temporary() {
321 return nil, err
322 }
323 }
324 }
325 if !cc.WaitForStateChange(ctx, s) {
326 // ctx got timeout or canceled.
327 return nil, ctx.Err()
328 }
329 }
330 }
331
332 return cc, nil
333}
334
335// chainUnaryClientInterceptors chains all unary client interceptors into one.
336func chainUnaryClientInterceptors(cc *ClientConn) {
337 interceptors := cc.dopts.chainUnaryInts
338 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
339 // be executed before any other chained interceptors.
340 if cc.dopts.unaryInt != nil {
341 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
342 }
343 var chainedInt UnaryClientInterceptor
344 if len(interceptors) == 0 {
345 chainedInt = nil
346 } else if len(interceptors) == 1 {
347 chainedInt = interceptors[0]
348 } else {
349 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
350 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
351 }
352 }
353 cc.dopts.unaryInt = chainedInt
354}
355
356// getChainUnaryInvoker recursively generate the chained unary invoker.
357func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
358 if curr == len(interceptors)-1 {
359 return finalInvoker
360 }
361 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
362 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
363 }
364}
365
366// chainStreamClientInterceptors chains all stream client interceptors into one.
367func chainStreamClientInterceptors(cc *ClientConn) {
368 interceptors := cc.dopts.chainStreamInts
369 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
370 // be executed before any other chained interceptors.
371 if cc.dopts.streamInt != nil {
372 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
373 }
374 var chainedInt StreamClientInterceptor
375 if len(interceptors) == 0 {
376 chainedInt = nil
377 } else if len(interceptors) == 1 {
378 chainedInt = interceptors[0]
379 } else {
380 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
381 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
382 }
383 }
384 cc.dopts.streamInt = chainedInt
385}
386
387// getChainStreamer recursively generate the chained client stream constructor.
388func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
389 if curr == len(interceptors)-1 {
390 return finalStreamer
391 }
392 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
393 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
394 }
395}
396
397// connectivityStateManager keeps the connectivity.State of ClientConn.
398// This struct will eventually be exported so the balancers can access it.
399type connectivityStateManager struct {
400 mu sync.Mutex
401 state connectivity.State
402 notifyChan chan struct{}
403 channelzID int64
404}
405
406// updateState updates the connectivity.State of ClientConn.
407// If there's a change it notifies goroutines waiting on state change to
408// happen.
409func (csm *connectivityStateManager) updateState(state connectivity.State) {
410 csm.mu.Lock()
411 defer csm.mu.Unlock()
412 if csm.state == connectivity.Shutdown {
413 return
414 }
415 if csm.state == state {
416 return
417 }
418 csm.state = state
419 if channelz.IsOn() {
420 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
421 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
422 Severity: channelz.CtINFO,
423 })
424 }
425 if csm.notifyChan != nil {
426 // There are other goroutines waiting on this channel.
427 close(csm.notifyChan)
428 csm.notifyChan = nil
429 }
430}
431
432func (csm *connectivityStateManager) getState() connectivity.State {
433 csm.mu.Lock()
434 defer csm.mu.Unlock()
435 return csm.state
436}
437
438func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
439 csm.mu.Lock()
440 defer csm.mu.Unlock()
441 if csm.notifyChan == nil {
442 csm.notifyChan = make(chan struct{})
443 }
444 return csm.notifyChan
445}
446
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000447// ClientConnInterface defines the functions clients need to perform unary and
448// streaming RPCs. It is implemented by *ClientConn, and is only intended to
449// be referenced by generated code.
450type ClientConnInterface interface {
451 // Invoke performs a unary RPC and returns after the response is received
452 // into reply.
453 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
454 // NewStream begins a streaming RPC.
455 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
456}
457
458// Assert *ClientConn implements ClientConnInterface.
459var _ ClientConnInterface = (*ClientConn)(nil)
460
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000461// ClientConn represents a virtual connection to a conceptual endpoint, to
462// perform RPCs.
463//
464// A ClientConn is free to have zero or more actual connections to the endpoint
465// based on configuration, load, etc. It is also free to determine which actual
466// endpoints to use and may change it every RPC, permitting client-side load
467// balancing.
468//
469// A ClientConn encapsulates a range of functionality including name
470// resolution, TCP connection establishment (with retries and backoff) and TLS
471// handshakes. It also handles errors on established connections by
472// re-resolving the name and reconnecting.
473type ClientConn struct {
474 ctx context.Context
475 cancel context.CancelFunc
476
477 target string
478 parsedTarget resolver.Target
479 authority string
480 dopts dialOptions
481 csMgr *connectivityStateManager
482
483 balancerBuildOpts balancer.BuildOptions
484 blockingpicker *pickerWrapper
485
486 mu sync.RWMutex
487 resolverWrapper *ccResolverWrapper
488 sc *ServiceConfig
489 conns map[*addrConn]struct{}
490 // Keepalive parameter can be updated if a GoAway is received.
491 mkp keepalive.ClientParameters
492 curBalancerName string
493 balancerWrapper *ccBalancerWrapper
494 retryThrottler atomic.Value
495
496 firstResolveEvent *grpcsync.Event
497
498 channelzID int64 // channelz unique identification number
499 czData *channelzData
500}
501
502// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
503// ctx expires. A true value is returned in former case and false in latter.
504// This is an EXPERIMENTAL API.
505func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
506 ch := cc.csMgr.getNotifyChan()
507 if cc.csMgr.getState() != sourceState {
508 return true
509 }
510 select {
511 case <-ctx.Done():
512 return false
513 case <-ch:
514 return true
515 }
516}
517
518// GetState returns the connectivity.State of ClientConn.
519// This is an EXPERIMENTAL API.
520func (cc *ClientConn) GetState() connectivity.State {
521 return cc.csMgr.getState()
522}
523
524func (cc *ClientConn) scWatcher() {
525 for {
526 select {
527 case sc, ok := <-cc.dopts.scChan:
528 if !ok {
529 return
530 }
531 cc.mu.Lock()
532 // TODO: load balance policy runtime change is ignored.
533 // We may revisit this decision in the future.
534 cc.sc = &sc
535 cc.mu.Unlock()
536 case <-cc.ctx.Done():
537 return
538 }
539 }
540}
541
542// waitForResolvedAddrs blocks until the resolver has provided addresses or the
543// context expires. Returns nil unless the context expires first; otherwise
544// returns a status error based on the context.
545func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
546 // This is on the RPC path, so we use a fast path to avoid the
547 // more-expensive "select" below after the resolver has returned once.
548 if cc.firstResolveEvent.HasFired() {
549 return nil
550 }
551 select {
552 case <-cc.firstResolveEvent.Done():
553 return nil
554 case <-ctx.Done():
555 return status.FromContextError(ctx.Err()).Err()
556 case <-cc.ctx.Done():
557 return ErrClientConnClosing
558 }
559}
560
561var emptyServiceConfig *ServiceConfig
562
563func init() {
564 cfg := parseServiceConfig("{}")
565 if cfg.Err != nil {
566 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
567 }
568 emptyServiceConfig = cfg.Config.(*ServiceConfig)
569}
570
571func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
572 if cc.sc != nil {
573 cc.applyServiceConfigAndBalancer(cc.sc, addrs)
574 return
575 }
576 if cc.dopts.defaultServiceConfig != nil {
577 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
578 } else {
579 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
580 }
581}
582
583func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
584 defer cc.firstResolveEvent.Fire()
585 cc.mu.Lock()
586 // Check if the ClientConn is already closed. Some fields (e.g.
587 // balancerWrapper) are set to nil when closing the ClientConn, and could
588 // cause nil pointer panic if we don't have this check.
589 if cc.conns == nil {
590 cc.mu.Unlock()
591 return nil
592 }
593
594 if err != nil {
595 // May need to apply the initial service config in case the resolver
596 // doesn't support service configs, or doesn't provide a service config
597 // with the new addresses.
598 cc.maybeApplyDefaultServiceConfig(nil)
599
600 if cc.balancerWrapper != nil {
601 cc.balancerWrapper.resolverError(err)
602 }
603
604 // No addresses are valid with err set; return early.
605 cc.mu.Unlock()
606 return balancer.ErrBadResolverState
607 }
608
609 var ret error
610 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
611 cc.maybeApplyDefaultServiceConfig(s.Addresses)
612 // TODO: do we need to apply a failing LB policy if there is no
613 // default, per the error handling design?
614 } else {
615 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
616 cc.applyServiceConfigAndBalancer(sc, s.Addresses)
617 } else {
618 ret = balancer.ErrBadResolverState
619 if cc.balancerWrapper == nil {
620 var err error
621 if s.ServiceConfig.Err != nil {
622 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
623 } else {
624 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
625 }
626 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
627 cc.csMgr.updateState(connectivity.TransientFailure)
628 cc.mu.Unlock()
629 return ret
630 }
631 }
632 }
633
634 var balCfg serviceconfig.LoadBalancingConfig
635 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
636 balCfg = cc.sc.lbConfig.cfg
637 }
638
639 cbn := cc.curBalancerName
640 bw := cc.balancerWrapper
641 cc.mu.Unlock()
642 if cbn != grpclbName {
643 // Filter any grpclb addresses since we don't have the grpclb balancer.
644 for i := 0; i < len(s.Addresses); {
645 if s.Addresses[i].Type == resolver.GRPCLB {
646 copy(s.Addresses[i:], s.Addresses[i+1:])
647 s.Addresses = s.Addresses[:len(s.Addresses)-1]
648 continue
649 }
650 i++
651 }
652 }
653 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
654 if ret == nil {
655 ret = uccsErr // prefer ErrBadResolver state since any other error is
656 // currently meaningless to the caller.
657 }
658 return ret
659}
660
661// switchBalancer starts the switching from current balancer to the balancer
662// with the given name.
663//
664// It will NOT send the current address list to the new balancer. If needed,
665// caller of this function should send address list to the new balancer after
666// this function returns.
667//
668// Caller must hold cc.mu.
669func (cc *ClientConn) switchBalancer(name string) {
670 if strings.EqualFold(cc.curBalancerName, name) {
671 return
672 }
673
674 grpclog.Infof("ClientConn switching balancer to %q", name)
675 if cc.dopts.balancerBuilder != nil {
676 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
677 return
678 }
679 if cc.balancerWrapper != nil {
680 cc.balancerWrapper.close()
681 }
682
683 builder := balancer.Get(name)
684 if channelz.IsOn() {
685 if builder == nil {
686 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
687 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
688 Severity: channelz.CtWarning,
689 })
690 } else {
691 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
692 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
693 Severity: channelz.CtINFO,
694 })
695 }
696 }
697 if builder == nil {
698 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
699 builder = newPickfirstBuilder()
700 }
701
702 cc.curBalancerName = builder.Name()
703 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
704}
705
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000706func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000707 cc.mu.Lock()
708 if cc.conns == nil {
709 cc.mu.Unlock()
710 return
711 }
712 // TODO(bar switching) send updates to all balancer wrappers when balancer
713 // gracefully switching is supported.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000714 cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000715 cc.mu.Unlock()
716}
717
718// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
719//
720// Caller needs to make sure len(addrs) > 0.
721func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
722 ac := &addrConn{
723 cc: cc,
724 addrs: addrs,
725 scopts: opts,
726 dopts: cc.dopts,
727 czData: new(channelzData),
728 resetBackoff: make(chan struct{}),
729 }
730 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
731 // Track ac in cc. This needs to be done before any getTransport(...) is called.
732 cc.mu.Lock()
733 if cc.conns == nil {
734 cc.mu.Unlock()
735 return nil, ErrClientConnClosing
736 }
737 if channelz.IsOn() {
738 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
739 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
740 Desc: "Subchannel Created",
741 Severity: channelz.CtINFO,
742 Parent: &channelz.TraceEventDesc{
743 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
744 Severity: channelz.CtINFO,
745 },
746 })
747 }
748 cc.conns[ac] = struct{}{}
749 cc.mu.Unlock()
750 return ac, nil
751}
752
753// removeAddrConn removes the addrConn in the subConn from clientConn.
754// It also tears down the ac with the given error.
755func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
756 cc.mu.Lock()
757 if cc.conns == nil {
758 cc.mu.Unlock()
759 return
760 }
761 delete(cc.conns, ac)
762 cc.mu.Unlock()
763 ac.tearDown(err)
764}
765
766func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
767 return &channelz.ChannelInternalMetric{
768 State: cc.GetState(),
769 Target: cc.target,
770 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
771 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
772 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
773 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
774 }
775}
776
777// Target returns the target string of the ClientConn.
778// This is an EXPERIMENTAL API.
779func (cc *ClientConn) Target() string {
780 return cc.target
781}
782
783func (cc *ClientConn) incrCallsStarted() {
784 atomic.AddInt64(&cc.czData.callsStarted, 1)
785 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
786}
787
788func (cc *ClientConn) incrCallsSucceeded() {
789 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
790}
791
792func (cc *ClientConn) incrCallsFailed() {
793 atomic.AddInt64(&cc.czData.callsFailed, 1)
794}
795
796// connect starts creating a transport.
797// It does nothing if the ac is not IDLE.
798// TODO(bar) Move this to the addrConn section.
799func (ac *addrConn) connect() error {
800 ac.mu.Lock()
801 if ac.state == connectivity.Shutdown {
802 ac.mu.Unlock()
803 return errConnClosing
804 }
805 if ac.state != connectivity.Idle {
806 ac.mu.Unlock()
807 return nil
808 }
809 // Update connectivity state within the lock to prevent subsequent or
810 // concurrent calls from resetting the transport more than once.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000811 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000812 ac.mu.Unlock()
813
814 // Start a goroutine connecting to the server asynchronously.
815 go ac.resetTransport()
816 return nil
817}
818
819// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
820//
821// If ac is Connecting, it returns false. The caller should tear down the ac and
822// create a new one. Note that the backoff will be reset when this happens.
823//
824// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
825// addresses will be picked up by retry in the next iteration after backoff.
826//
827// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
828//
829// If ac is Ready, it checks whether current connected address of ac is in the
830// new addrs list.
831// - If true, it updates ac.addrs and returns true. The ac will keep using
832// the existing connection.
833// - If false, it does nothing and returns false.
834func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
835 ac.mu.Lock()
836 defer ac.mu.Unlock()
837 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
838 if ac.state == connectivity.Shutdown ||
839 ac.state == connectivity.TransientFailure ||
840 ac.state == connectivity.Idle {
841 ac.addrs = addrs
842 return true
843 }
844
845 if ac.state == connectivity.Connecting {
846 return false
847 }
848
849 // ac.state is Ready, try to find the connected address.
850 var curAddrFound bool
851 for _, a := range addrs {
852 if reflect.DeepEqual(ac.curAddr, a) {
853 curAddrFound = true
854 break
855 }
856 }
857 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
858 if curAddrFound {
859 ac.addrs = addrs
860 }
861
862 return curAddrFound
863}
864
865// GetMethodConfig gets the method config of the input method.
866// If there's an exact match for input method (i.e. /service/method), we return
867// the corresponding MethodConfig.
868// If there isn't an exact match for the input method, we look for the default config
869// under the service (i.e /service/). If there is a default MethodConfig for
870// the service, we return it.
871// Otherwise, we return an empty MethodConfig.
872func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
873 // TODO: Avoid the locking here.
874 cc.mu.RLock()
875 defer cc.mu.RUnlock()
876 if cc.sc == nil {
877 return MethodConfig{}
878 }
879 m, ok := cc.sc.Methods[method]
880 if !ok {
881 i := strings.LastIndex(method, "/")
882 m = cc.sc.Methods[method[:i+1]]
883 }
884 return m
885}
886
887func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
888 cc.mu.RLock()
889 defer cc.mu.RUnlock()
890 if cc.sc == nil {
891 return nil
892 }
893 return cc.sc.healthCheckConfig
894}
895
896func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000897 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
898 Ctx: ctx,
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000899 FullMethodName: method,
900 })
901 if err != nil {
902 return nil, nil, toRPCErr(err)
903 }
904 return t, done, nil
905}
906
907func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
908 if sc == nil {
909 // should never reach here.
910 return
911 }
912 cc.sc = sc
913
914 if cc.sc.retryThrottling != nil {
915 newThrottler := &retryThrottler{
916 tokens: cc.sc.retryThrottling.MaxTokens,
917 max: cc.sc.retryThrottling.MaxTokens,
918 thresh: cc.sc.retryThrottling.MaxTokens / 2,
919 ratio: cc.sc.retryThrottling.TokenRatio,
920 }
921 cc.retryThrottler.Store(newThrottler)
922 } else {
923 cc.retryThrottler.Store((*retryThrottler)(nil))
924 }
925
926 if cc.dopts.balancerBuilder == nil {
927 // Only look at balancer types and switch balancer if balancer dial
928 // option is not set.
929 var newBalancerName string
930 if cc.sc != nil && cc.sc.lbConfig != nil {
931 newBalancerName = cc.sc.lbConfig.name
932 } else {
933 var isGRPCLB bool
934 for _, a := range addrs {
935 if a.Type == resolver.GRPCLB {
936 isGRPCLB = true
937 break
938 }
939 }
940 if isGRPCLB {
941 newBalancerName = grpclbName
942 } else if cc.sc != nil && cc.sc.LB != nil {
943 newBalancerName = *cc.sc.LB
944 } else {
945 newBalancerName = PickFirstBalancerName
946 }
947 }
948 cc.switchBalancer(newBalancerName)
949 } else if cc.balancerWrapper == nil {
950 // Balancer dial option was set, and this is the first time handling
951 // resolved addresses. Build a balancer with dopts.balancerBuilder.
952 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
953 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
954 }
955}
956
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000957func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000958 cc.mu.RLock()
959 r := cc.resolverWrapper
960 cc.mu.RUnlock()
961 if r == nil {
962 return
963 }
964 go r.resolveNow(o)
965}
966
967// ResetConnectBackoff wakes up all subchannels in transient failure and causes
968// them to attempt another connection immediately. It also resets the backoff
969// times used for subsequent attempts regardless of the current state.
970//
971// In general, this function should not be used. Typical service or network
972// outages result in a reasonable client reconnection strategy by default.
973// However, if a previously unavailable network becomes available, this may be
974// used to trigger an immediate reconnect.
975//
976// This API is EXPERIMENTAL.
977func (cc *ClientConn) ResetConnectBackoff() {
978 cc.mu.Lock()
979 conns := cc.conns
980 cc.mu.Unlock()
981 for ac := range conns {
982 ac.resetConnectBackoff()
983 }
984}
985
986// Close tears down the ClientConn and all underlying connections.
987func (cc *ClientConn) Close() error {
988 defer cc.cancel()
989
990 cc.mu.Lock()
991 if cc.conns == nil {
992 cc.mu.Unlock()
993 return ErrClientConnClosing
994 }
995 conns := cc.conns
996 cc.conns = nil
997 cc.csMgr.updateState(connectivity.Shutdown)
998
999 rWrapper := cc.resolverWrapper
1000 cc.resolverWrapper = nil
1001 bWrapper := cc.balancerWrapper
1002 cc.balancerWrapper = nil
1003 cc.mu.Unlock()
1004
1005 cc.blockingpicker.close()
1006
1007 if rWrapper != nil {
1008 rWrapper.close()
1009 }
1010 if bWrapper != nil {
1011 bWrapper.close()
1012 }
1013
1014 for ac := range conns {
1015 ac.tearDown(ErrClientConnClosing)
1016 }
1017 if channelz.IsOn() {
1018 ted := &channelz.TraceEventDesc{
1019 Desc: "Channel Deleted",
1020 Severity: channelz.CtINFO,
1021 }
1022 if cc.dopts.channelzParentID != 0 {
1023 ted.Parent = &channelz.TraceEventDesc{
1024 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1025 Severity: channelz.CtINFO,
1026 }
1027 }
1028 channelz.AddTraceEvent(cc.channelzID, ted)
1029 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1030 // the entity being deleted, and thus prevent it from being deleted right away.
1031 channelz.RemoveEntry(cc.channelzID)
1032 }
1033 return nil
1034}
1035
1036// addrConn is a network connection to a given address.
1037type addrConn struct {
1038 ctx context.Context
1039 cancel context.CancelFunc
1040
1041 cc *ClientConn
1042 dopts dialOptions
1043 acbw balancer.SubConn
1044 scopts balancer.NewSubConnOptions
1045
1046 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1047 // health checking may require server to report healthy to set ac to READY), and is reset
1048 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1049 // is received, transport is closed, ac has been torn down).
1050 transport transport.ClientTransport // The current transport.
1051
1052 mu sync.Mutex
1053 curAddr resolver.Address // The current address.
1054 addrs []resolver.Address // All addresses that the resolver resolved to.
1055
1056 // Use updateConnectivityState for updating addrConn's connectivity state.
1057 state connectivity.State
1058
1059 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1060 resetBackoff chan struct{}
1061
1062 channelzID int64 // channelz unique identification number.
1063 czData *channelzData
1064}
1065
1066// Note: this requires a lock on ac.mu.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001067func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001068 if ac.state == s {
1069 return
1070 }
1071
1072 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
1073 ac.state = s
1074 if channelz.IsOn() {
1075 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1076 Desc: updateMsg,
1077 Severity: channelz.CtINFO,
1078 })
1079 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001080 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001081}
1082
1083// adjustParams updates parameters used to create transports upon
1084// receiving a GoAway.
1085func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1086 switch r {
1087 case transport.GoAwayTooManyPings:
1088 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1089 ac.cc.mu.Lock()
1090 if v > ac.cc.mkp.Time {
1091 ac.cc.mkp.Time = v
1092 }
1093 ac.cc.mu.Unlock()
1094 }
1095}
1096
1097func (ac *addrConn) resetTransport() {
1098 for i := 0; ; i++ {
1099 if i > 0 {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001100 ac.cc.resolveNow(resolver.ResolveNowOptions{})
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001101 }
1102
1103 ac.mu.Lock()
1104 if ac.state == connectivity.Shutdown {
1105 ac.mu.Unlock()
1106 return
1107 }
1108
1109 addrs := ac.addrs
1110 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1111 // This will be the duration that dial gets to finish.
1112 dialDuration := minConnectTimeout
1113 if ac.dopts.minConnectTimeout != nil {
1114 dialDuration = ac.dopts.minConnectTimeout()
1115 }
1116
1117 if dialDuration < backoffFor {
1118 // Give dial more time as we keep failing to connect.
1119 dialDuration = backoffFor
1120 }
1121 // We can potentially spend all the time trying the first address, and
1122 // if the server accepts the connection and then hangs, the following
1123 // addresses will never be tried.
1124 //
1125 // The spec doesn't mention what should be done for multiple addresses.
1126 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1127 connectDeadline := time.Now().Add(dialDuration)
1128
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001129 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001130 ac.transport = nil
1131 ac.mu.Unlock()
1132
1133 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1134 if err != nil {
1135 // After exhausting all addresses, the addrConn enters
1136 // TRANSIENT_FAILURE.
1137 ac.mu.Lock()
1138 if ac.state == connectivity.Shutdown {
1139 ac.mu.Unlock()
1140 return
1141 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001142 ac.updateConnectivityState(connectivity.TransientFailure, err)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001143
1144 // Backoff.
1145 b := ac.resetBackoff
1146 ac.mu.Unlock()
1147
1148 timer := time.NewTimer(backoffFor)
1149 select {
1150 case <-timer.C:
1151 ac.mu.Lock()
1152 ac.backoffIdx++
1153 ac.mu.Unlock()
1154 case <-b:
1155 timer.Stop()
1156 case <-ac.ctx.Done():
1157 timer.Stop()
1158 return
1159 }
1160 continue
1161 }
1162
1163 ac.mu.Lock()
1164 if ac.state == connectivity.Shutdown {
1165 ac.mu.Unlock()
1166 newTr.Close()
1167 return
1168 }
1169 ac.curAddr = addr
1170 ac.transport = newTr
1171 ac.backoffIdx = 0
1172
1173 hctx, hcancel := context.WithCancel(ac.ctx)
1174 ac.startHealthCheck(hctx)
1175 ac.mu.Unlock()
1176
1177 // Block until the created transport is down. And when this happens,
1178 // we restart from the top of the addr list.
1179 <-reconnect.Done()
1180 hcancel()
1181 // restart connecting - the top of the loop will set state to
1182 // CONNECTING. This is against the current connectivity semantics doc,
1183 // however it allows for graceful behavior for RPCs not yet dispatched
1184 // - unfortunate timing would otherwise lead to the RPC failing even
1185 // though the TRANSIENT_FAILURE state (called for by the doc) would be
1186 // instantaneous.
1187 //
1188 // Ideally we should transition to Idle here and block until there is
1189 // RPC activity that leads to the balancer requesting a reconnect of
1190 // the associated SubConn.
1191 }
1192}
1193
1194// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1195// first successful one. It returns the transport, the address and a Event in
1196// the successful case. The Event fires when the returned transport disconnects.
1197func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001198 var firstConnErr error
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001199 for _, addr := range addrs {
1200 ac.mu.Lock()
1201 if ac.state == connectivity.Shutdown {
1202 ac.mu.Unlock()
1203 return nil, resolver.Address{}, nil, errConnClosing
1204 }
1205
1206 ac.cc.mu.RLock()
1207 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1208 ac.cc.mu.RUnlock()
1209
1210 copts := ac.dopts.copts
1211 if ac.scopts.CredsBundle != nil {
1212 copts.CredsBundle = ac.scopts.CredsBundle
1213 }
1214 ac.mu.Unlock()
1215
1216 if channelz.IsOn() {
1217 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1218 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1219 Severity: channelz.CtINFO,
1220 })
1221 }
1222
1223 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1224 if err == nil {
1225 return newTr, addr, reconnect, nil
1226 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001227 if firstConnErr == nil {
1228 firstConnErr = err
1229 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001230 ac.cc.blockingpicker.updateConnectionError(err)
1231 }
1232
1233 // Couldn't connect to any address.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001234 return nil, resolver.Address{}, nil, firstConnErr
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001235}
1236
1237// createTransport creates a connection to addr. It returns the transport and a
1238// Event in the successful case. The Event fires when the returned transport
1239// disconnects.
1240func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1241 prefaceReceived := make(chan struct{})
1242 onCloseCalled := make(chan struct{})
1243 reconnect := grpcsync.NewEvent()
1244
1245 authority := ac.cc.authority
1246 // addr.ServerName takes precedent over ClientConn authority, if present.
1247 if addr.ServerName != "" {
1248 authority = addr.ServerName
1249 }
1250
1251 target := transport.TargetInfo{
1252 Addr: addr.Addr,
1253 Metadata: addr.Metadata,
1254 Authority: authority,
1255 }
1256
1257 once := sync.Once{}
1258 onGoAway := func(r transport.GoAwayReason) {
1259 ac.mu.Lock()
1260 ac.adjustParams(r)
1261 once.Do(func() {
1262 if ac.state == connectivity.Ready {
1263 // Prevent this SubConn from being used for new RPCs by setting its
1264 // state to Connecting.
1265 //
1266 // TODO: this should be Idle when grpc-go properly supports it.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001267 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001268 }
1269 })
1270 ac.mu.Unlock()
1271 reconnect.Fire()
1272 }
1273
1274 onClose := func() {
1275 ac.mu.Lock()
1276 once.Do(func() {
1277 if ac.state == connectivity.Ready {
1278 // Prevent this SubConn from being used for new RPCs by setting its
1279 // state to Connecting.
1280 //
1281 // TODO: this should be Idle when grpc-go properly supports it.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001282 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001283 }
1284 })
1285 ac.mu.Unlock()
1286 close(onCloseCalled)
1287 reconnect.Fire()
1288 }
1289
1290 onPrefaceReceipt := func() {
1291 close(prefaceReceived)
1292 }
1293
1294 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1295 defer cancel()
1296 if channelz.IsOn() {
1297 copts.ChannelzParentID = ac.channelzID
1298 }
1299
1300 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1301 if err != nil {
1302 // newTr is either nil, or closed.
1303 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1304 return nil, nil, err
1305 }
1306
1307 select {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001308 case <-time.After(time.Until(connectDeadline)):
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001309 // We didn't get the preface in time.
1310 newTr.Close()
1311 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1312 return nil, nil, errors.New("timed out waiting for server handshake")
1313 case <-prefaceReceived:
1314 // We got the preface - huzzah! things are good.
1315 case <-onCloseCalled:
1316 // The transport has already closed - noop.
1317 return nil, nil, errors.New("connection closed")
1318 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1319 }
1320 return newTr, reconnect, nil
1321}
1322
1323// startHealthCheck starts the health checking stream (RPC) to watch the health
1324// stats of this connection if health checking is requested and configured.
1325//
1326// LB channel health checking is enabled when all requirements below are met:
1327// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1328// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1329// 3. a service config with non-empty healthCheckConfig field is provided
1330// 4. the load balancer requests it
1331//
1332// It sets addrConn to READY if the health checking stream is not started.
1333//
1334// Caller must hold ac.mu.
1335func (ac *addrConn) startHealthCheck(ctx context.Context) {
1336 var healthcheckManagingState bool
1337 defer func() {
1338 if !healthcheckManagingState {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001339 ac.updateConnectivityState(connectivity.Ready, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001340 }
1341 }()
1342
1343 if ac.cc.dopts.disableHealthCheck {
1344 return
1345 }
1346 healthCheckConfig := ac.cc.healthCheckConfig()
1347 if healthCheckConfig == nil {
1348 return
1349 }
1350 if !ac.scopts.HealthCheckEnabled {
1351 return
1352 }
1353 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1354 if healthCheckFunc == nil {
1355 // The health package is not imported to set health check function.
1356 //
1357 // TODO: add a link to the health check doc in the error message.
1358 grpclog.Error("Health check is requested but health check function is not set.")
1359 return
1360 }
1361
1362 healthcheckManagingState = true
1363
1364 // Set up the health check helper functions.
1365 currentTr := ac.transport
1366 newStream := func(method string) (interface{}, error) {
1367 ac.mu.Lock()
1368 if ac.transport != currentTr {
1369 ac.mu.Unlock()
1370 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1371 }
1372 ac.mu.Unlock()
1373 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1374 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001375 setConnectivityState := func(s connectivity.State, lastErr error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001376 ac.mu.Lock()
1377 defer ac.mu.Unlock()
1378 if ac.transport != currentTr {
1379 return
1380 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001381 ac.updateConnectivityState(s, lastErr)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001382 }
1383 // Start the health checking stream.
1384 go func() {
1385 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1386 if err != nil {
1387 if status.Code(err) == codes.Unimplemented {
1388 if channelz.IsOn() {
1389 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1390 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1391 Severity: channelz.CtError,
1392 })
1393 }
1394 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1395 } else {
1396 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1397 }
1398 }
1399 }()
1400}
1401
1402func (ac *addrConn) resetConnectBackoff() {
1403 ac.mu.Lock()
1404 close(ac.resetBackoff)
1405 ac.backoffIdx = 0
1406 ac.resetBackoff = make(chan struct{})
1407 ac.mu.Unlock()
1408}
1409
1410// getReadyTransport returns the transport if ac's state is READY.
1411// Otherwise it returns nil, false.
1412// If ac's state is IDLE, it will trigger ac to connect.
1413func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1414 ac.mu.Lock()
1415 if ac.state == connectivity.Ready && ac.transport != nil {
1416 t := ac.transport
1417 ac.mu.Unlock()
1418 return t, true
1419 }
1420 var idle bool
1421 if ac.state == connectivity.Idle {
1422 idle = true
1423 }
1424 ac.mu.Unlock()
1425 // Trigger idle ac to connect.
1426 if idle {
1427 ac.connect()
1428 }
1429 return nil, false
1430}
1431
1432// tearDown starts to tear down the addrConn.
1433// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1434// some edge cases (e.g., the caller opens and closes many addrConn's in a
1435// tight loop.
1436// tearDown doesn't remove ac from ac.cc.conns.
1437func (ac *addrConn) tearDown(err error) {
1438 ac.mu.Lock()
1439 if ac.state == connectivity.Shutdown {
1440 ac.mu.Unlock()
1441 return
1442 }
1443 curTr := ac.transport
1444 ac.transport = nil
1445 // We have to set the state to Shutdown before anything else to prevent races
1446 // between setting the state and logic that waits on context cancellation / etc.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001447 ac.updateConnectivityState(connectivity.Shutdown, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001448 ac.cancel()
1449 ac.curAddr = resolver.Address{}
1450 if err == errConnDrain && curTr != nil {
1451 // GracefulClose(...) may be executed multiple times when
1452 // i) receiving multiple GoAway frames from the server; or
1453 // ii) there are concurrent name resolver/Balancer triggered
1454 // address removal and GoAway.
1455 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1456 ac.mu.Unlock()
1457 curTr.GracefulClose()
1458 ac.mu.Lock()
1459 }
1460 if channelz.IsOn() {
1461 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1462 Desc: "Subchannel Deleted",
1463 Severity: channelz.CtINFO,
1464 Parent: &channelz.TraceEventDesc{
1465 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1466 Severity: channelz.CtINFO,
1467 },
1468 })
1469 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1470 // the entity being deleted, and thus prevent it from being deleted right away.
1471 channelz.RemoveEntry(ac.channelzID)
1472 }
1473 ac.mu.Unlock()
1474}
1475
1476func (ac *addrConn) getState() connectivity.State {
1477 ac.mu.Lock()
1478 defer ac.mu.Unlock()
1479 return ac.state
1480}
1481
1482func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1483 ac.mu.Lock()
1484 addr := ac.curAddr.Addr
1485 ac.mu.Unlock()
1486 return &channelz.ChannelInternalMetric{
1487 State: ac.getState(),
1488 Target: addr,
1489 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1490 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1491 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1492 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1493 }
1494}
1495
1496func (ac *addrConn) incrCallsStarted() {
1497 atomic.AddInt64(&ac.czData.callsStarted, 1)
1498 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1499}
1500
1501func (ac *addrConn) incrCallsSucceeded() {
1502 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1503}
1504
1505func (ac *addrConn) incrCallsFailed() {
1506 atomic.AddInt64(&ac.czData.callsFailed, 1)
1507}
1508
1509type retryThrottler struct {
1510 max float64
1511 thresh float64
1512 ratio float64
1513
1514 mu sync.Mutex
1515 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1516}
1517
1518// throttle subtracts a retry token from the pool and returns whether a retry
1519// should be throttled (disallowed) based upon the retry throttling policy in
1520// the service config.
1521func (rt *retryThrottler) throttle() bool {
1522 if rt == nil {
1523 return false
1524 }
1525 rt.mu.Lock()
1526 defer rt.mu.Unlock()
1527 rt.tokens--
1528 if rt.tokens < 0 {
1529 rt.tokens = 0
1530 }
1531 return rt.tokens <= rt.thresh
1532}
1533
1534func (rt *retryThrottler) successfulRPC() {
1535 if rt == nil {
1536 return
1537 }
1538 rt.mu.Lock()
1539 defer rt.mu.Unlock()
1540 rt.tokens += rt.ratio
1541 if rt.tokens > rt.max {
1542 rt.tokens = rt.max
1543 }
1544}
1545
1546type channelzChannel struct {
1547 cc *ClientConn
1548}
1549
1550func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1551 return c.cc.channelzMetric()
1552}
1553
1554// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1555// underlying connections within the specified timeout.
1556//
1557// Deprecated: This error is never returned by grpc and should not be
1558// referenced by users.
1559var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001560
1561func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1562 for _, rb := range cc.dopts.resolvers {
1563 if cc.parsedTarget.Scheme == rb.Scheme() {
1564 return rb
1565 }
1566 }
1567 return resolver.Get(cc.parsedTarget.Scheme)
1568}