blob: 293d2f62fd1e10588417638110b8a5b12b250bf8 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +000038 "google.golang.org/grpc/internal/backoff"
39 "google.golang.org/grpc/internal/channelz"
40 "google.golang.org/grpc/internal/grpcsync"
Scott Baker105df152020-04-13 15:55:14 -070041 "google.golang.org/grpc/internal/grpcutil"
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +000042 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/serviceconfig"
46 "google.golang.org/grpc/status"
47
48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54 // minimum time to give a connection to complete
55 minConnectTimeout = 20 * time.Second
56 // must match grpclbName in grpclb/grpclb.go
57 grpclbName = "grpclb"
58)
59
60var (
61 // ErrClientConnClosing indicates that the operation is illegal because
62 // the ClientConn is closing.
63 //
64 // Deprecated: this error should not be relied upon by users; use the status
65 // code of Canceled instead.
66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68 errConnDrain = errors.New("grpc: the connection is drained")
69 // errConnClosing indicates that the connection is closing.
70 errConnClosing = errors.New("grpc: the connection is closing")
71 // errBalancerClosed indicates that the balancer is closed.
72 errBalancerClosed = errors.New("grpc: balancer is closed")
73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74 // service config.
75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80 // errNoTransportSecurity indicates that there is no transport security
81 // being set for ClientConn. Users should either set one or explicitly
82 // call WithInsecure DialOption to disable security.
83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84 // errTransportCredsAndBundle indicates that creds bundle is used together
85 // with other individual Transport Credentials.
86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87 // errTransportCredentialsMissing indicates that users want to transmit security
88 // information (e.g., OAuth2 token) which requires secure connection on an insecure
89 // connection.
90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
92 // and grpc.WithInsecure() are both called for a connection.
93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98 defaultClientMaxSendMessageSize = math.MaxInt32
99 // http2IOBufSize specifies the buffer size for sending frames.
100 defaultWriteBufSize = 32 * 1024
101 defaultReadBufSize = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106 return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126 cc := &ClientConn{
127 target: target,
128 csMgr: &connectivityStateManager{},
129 conns: make(map[*addrConn]struct{}),
130 dopts: defaultDialOptions(),
131 blockingpicker: newPickerWrapper(),
132 czData: new(channelzData),
133 firstResolveEvent: grpcsync.NewEvent(),
134 }
135 cc.retryThrottler.Store((*retryThrottler)(nil))
136 cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138 for _, opt := range opts {
139 opt.apply(&cc.dopts)
140 }
141
142 chainUnaryClientInterceptors(cc)
143 chainStreamClientInterceptors(cc)
144
145 defer func() {
146 if err != nil {
147 cc.Close()
148 }
149 }()
150
151 if channelz.IsOn() {
152 if cc.dopts.channelzParentID != 0 {
153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
Scott Baker105df152020-04-13 15:55:14 -0700154 channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000155 Desc: "Channel Created",
156 Severity: channelz.CtINFO,
157 Parent: &channelz.TraceEventDesc{
158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159 Severity: channelz.CtINFO,
160 },
161 })
162 } else {
163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
Scott Baker105df152020-04-13 15:55:14 -0700164 channelz.Info(cc.channelzID, "Channel Created")
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000165 }
166 cc.csMgr.channelzID = cc.channelzID
167 }
168
169 if !cc.dopts.insecure {
170 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
171 return nil, errNoTransportSecurity
172 }
173 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
174 return nil, errTransportCredsAndBundle
175 }
176 } else {
177 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
178 return nil, errCredentialsConflict
179 }
180 for _, cd := range cc.dopts.copts.PerRPCCredentials {
181 if cd.RequireTransportSecurity() {
182 return nil, errTransportCredentialsMissing
183 }
184 }
185 }
186
187 if cc.dopts.defaultServiceConfigRawJSON != nil {
188 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
189 if scpr.Err != nil {
190 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
191 }
192 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
193 }
194 cc.mkp = cc.dopts.copts.KeepaliveParams
195
196 if cc.dopts.copts.Dialer == nil {
197 cc.dopts.copts.Dialer = newProxyDialer(
198 func(ctx context.Context, addr string) (net.Conn, error) {
199 network, addr := parseDialTarget(addr)
200 return (&net.Dialer{}).DialContext(ctx, network, addr)
201 },
202 )
203 }
204
205 if cc.dopts.copts.UserAgent != "" {
206 cc.dopts.copts.UserAgent += " " + grpcUA
207 } else {
208 cc.dopts.copts.UserAgent = grpcUA
209 }
210
211 if cc.dopts.timeout > 0 {
212 var cancel context.CancelFunc
213 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
214 defer cancel()
215 }
216 defer func() {
217 select {
218 case <-ctx.Done():
219 conn, err = nil, ctx.Err()
220 default:
221 }
222 }()
223
224 scSet := false
225 if cc.dopts.scChan != nil {
226 // Try to get an initial service config.
227 select {
228 case sc, ok := <-cc.dopts.scChan:
229 if ok {
230 cc.sc = &sc
231 scSet = true
232 }
233 default:
234 }
235 }
236 if cc.dopts.bs == nil {
237 cc.dopts.bs = backoff.DefaultExponential
238 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000239
240 // Determine the resolver to use.
Scott Baker105df152020-04-13 15:55:14 -0700241 cc.parsedTarget = grpcutil.ParseTarget(cc.target)
242 channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000243 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
244 if resolverBuilder == nil {
245 // If resolver builder is still nil, the parsed target's scheme is
246 // not registered. Fallback to default resolver and set Endpoint to
247 // the original target.
Scott Baker105df152020-04-13 15:55:14 -0700248 channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000249 cc.parsedTarget = resolver.Target{
250 Scheme: resolver.GetDefaultScheme(),
251 Endpoint: target,
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000252 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000253 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
254 if resolverBuilder == nil {
255 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
256 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000257 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000258
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000259 creds := cc.dopts.copts.TransportCredentials
260 if creds != nil && creds.Info().ServerName != "" {
261 cc.authority = creds.Info().ServerName
262 } else if cc.dopts.insecure && cc.dopts.authority != "" {
263 cc.authority = cc.dopts.authority
264 } else {
265 // Use endpoint from "scheme://authority/endpoint" as the default
266 // authority for ClientConn.
267 cc.authority = cc.parsedTarget.Endpoint
268 }
269
270 if cc.dopts.scChan != nil && !scSet {
271 // Blocking wait for the initial service config.
272 select {
273 case sc, ok := <-cc.dopts.scChan:
274 if ok {
275 cc.sc = &sc
276 }
277 case <-ctx.Done():
278 return nil, ctx.Err()
279 }
280 }
281 if cc.dopts.scChan != nil {
282 go cc.scWatcher()
283 }
284
285 var credsClone credentials.TransportCredentials
286 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
287 credsClone = creds.Clone()
288 }
289 cc.balancerBuildOpts = balancer.BuildOptions{
290 DialCreds: credsClone,
291 CredsBundle: cc.dopts.copts.CredsBundle,
292 Dialer: cc.dopts.copts.Dialer,
293 ChannelzParentID: cc.channelzID,
294 Target: cc.parsedTarget,
295 }
296
297 // Build the resolver.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000298 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000299 if err != nil {
300 return nil, fmt.Errorf("failed to build resolver: %v", err)
301 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000302 cc.mu.Lock()
303 cc.resolverWrapper = rWrapper
304 cc.mu.Unlock()
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000305
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000306 // A blocking dial blocks until the clientConn is ready.
307 if cc.dopts.block {
308 for {
309 s := cc.GetState()
310 if s == connectivity.Ready {
311 break
312 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
313 if err = cc.blockingpicker.connectionError(); err != nil {
314 terr, ok := err.(interface {
315 Temporary() bool
316 })
317 if ok && !terr.Temporary() {
318 return nil, err
319 }
320 }
321 }
322 if !cc.WaitForStateChange(ctx, s) {
323 // ctx got timeout or canceled.
324 return nil, ctx.Err()
325 }
326 }
327 }
328
329 return cc, nil
330}
331
332// chainUnaryClientInterceptors chains all unary client interceptors into one.
333func chainUnaryClientInterceptors(cc *ClientConn) {
334 interceptors := cc.dopts.chainUnaryInts
335 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
336 // be executed before any other chained interceptors.
337 if cc.dopts.unaryInt != nil {
338 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
339 }
340 var chainedInt UnaryClientInterceptor
341 if len(interceptors) == 0 {
342 chainedInt = nil
343 } else if len(interceptors) == 1 {
344 chainedInt = interceptors[0]
345 } else {
346 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
347 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
348 }
349 }
350 cc.dopts.unaryInt = chainedInt
351}
352
353// getChainUnaryInvoker recursively generate the chained unary invoker.
354func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
355 if curr == len(interceptors)-1 {
356 return finalInvoker
357 }
358 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
359 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
360 }
361}
362
363// chainStreamClientInterceptors chains all stream client interceptors into one.
364func chainStreamClientInterceptors(cc *ClientConn) {
365 interceptors := cc.dopts.chainStreamInts
366 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
367 // be executed before any other chained interceptors.
368 if cc.dopts.streamInt != nil {
369 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
370 }
371 var chainedInt StreamClientInterceptor
372 if len(interceptors) == 0 {
373 chainedInt = nil
374 } else if len(interceptors) == 1 {
375 chainedInt = interceptors[0]
376 } else {
377 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
378 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
379 }
380 }
381 cc.dopts.streamInt = chainedInt
382}
383
384// getChainStreamer recursively generate the chained client stream constructor.
385func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
386 if curr == len(interceptors)-1 {
387 return finalStreamer
388 }
389 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
390 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
391 }
392}
393
394// connectivityStateManager keeps the connectivity.State of ClientConn.
395// This struct will eventually be exported so the balancers can access it.
396type connectivityStateManager struct {
397 mu sync.Mutex
398 state connectivity.State
399 notifyChan chan struct{}
400 channelzID int64
401}
402
403// updateState updates the connectivity.State of ClientConn.
404// If there's a change it notifies goroutines waiting on state change to
405// happen.
406func (csm *connectivityStateManager) updateState(state connectivity.State) {
407 csm.mu.Lock()
408 defer csm.mu.Unlock()
409 if csm.state == connectivity.Shutdown {
410 return
411 }
412 if csm.state == state {
413 return
414 }
415 csm.state = state
Scott Baker105df152020-04-13 15:55:14 -0700416 channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000417 if csm.notifyChan != nil {
418 // There are other goroutines waiting on this channel.
419 close(csm.notifyChan)
420 csm.notifyChan = nil
421 }
422}
423
424func (csm *connectivityStateManager) getState() connectivity.State {
425 csm.mu.Lock()
426 defer csm.mu.Unlock()
427 return csm.state
428}
429
430func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
431 csm.mu.Lock()
432 defer csm.mu.Unlock()
433 if csm.notifyChan == nil {
434 csm.notifyChan = make(chan struct{})
435 }
436 return csm.notifyChan
437}
438
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000439// ClientConnInterface defines the functions clients need to perform unary and
440// streaming RPCs. It is implemented by *ClientConn, and is only intended to
441// be referenced by generated code.
442type ClientConnInterface interface {
443 // Invoke performs a unary RPC and returns after the response is received
444 // into reply.
445 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
446 // NewStream begins a streaming RPC.
447 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
448}
449
450// Assert *ClientConn implements ClientConnInterface.
451var _ ClientConnInterface = (*ClientConn)(nil)
452
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000453// ClientConn represents a virtual connection to a conceptual endpoint, to
454// perform RPCs.
455//
456// A ClientConn is free to have zero or more actual connections to the endpoint
457// based on configuration, load, etc. It is also free to determine which actual
458// endpoints to use and may change it every RPC, permitting client-side load
459// balancing.
460//
461// A ClientConn encapsulates a range of functionality including name
462// resolution, TCP connection establishment (with retries and backoff) and TLS
463// handshakes. It also handles errors on established connections by
464// re-resolving the name and reconnecting.
465type ClientConn struct {
466 ctx context.Context
467 cancel context.CancelFunc
468
469 target string
470 parsedTarget resolver.Target
471 authority string
472 dopts dialOptions
473 csMgr *connectivityStateManager
474
475 balancerBuildOpts balancer.BuildOptions
476 blockingpicker *pickerWrapper
477
478 mu sync.RWMutex
479 resolverWrapper *ccResolverWrapper
480 sc *ServiceConfig
481 conns map[*addrConn]struct{}
482 // Keepalive parameter can be updated if a GoAway is received.
483 mkp keepalive.ClientParameters
484 curBalancerName string
485 balancerWrapper *ccBalancerWrapper
486 retryThrottler atomic.Value
487
488 firstResolveEvent *grpcsync.Event
489
490 channelzID int64 // channelz unique identification number
491 czData *channelzData
492}
493
494// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
495// ctx expires. A true value is returned in former case and false in latter.
496// This is an EXPERIMENTAL API.
497func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
498 ch := cc.csMgr.getNotifyChan()
499 if cc.csMgr.getState() != sourceState {
500 return true
501 }
502 select {
503 case <-ctx.Done():
504 return false
505 case <-ch:
506 return true
507 }
508}
509
510// GetState returns the connectivity.State of ClientConn.
511// This is an EXPERIMENTAL API.
512func (cc *ClientConn) GetState() connectivity.State {
513 return cc.csMgr.getState()
514}
515
516func (cc *ClientConn) scWatcher() {
517 for {
518 select {
519 case sc, ok := <-cc.dopts.scChan:
520 if !ok {
521 return
522 }
523 cc.mu.Lock()
524 // TODO: load balance policy runtime change is ignored.
525 // We may revisit this decision in the future.
526 cc.sc = &sc
527 cc.mu.Unlock()
528 case <-cc.ctx.Done():
529 return
530 }
531 }
532}
533
534// waitForResolvedAddrs blocks until the resolver has provided addresses or the
535// context expires. Returns nil unless the context expires first; otherwise
536// returns a status error based on the context.
537func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
538 // This is on the RPC path, so we use a fast path to avoid the
539 // more-expensive "select" below after the resolver has returned once.
540 if cc.firstResolveEvent.HasFired() {
541 return nil
542 }
543 select {
544 case <-cc.firstResolveEvent.Done():
545 return nil
546 case <-ctx.Done():
547 return status.FromContextError(ctx.Err()).Err()
548 case <-cc.ctx.Done():
549 return ErrClientConnClosing
550 }
551}
552
553var emptyServiceConfig *ServiceConfig
554
555func init() {
556 cfg := parseServiceConfig("{}")
557 if cfg.Err != nil {
558 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
559 }
560 emptyServiceConfig = cfg.Config.(*ServiceConfig)
561}
562
563func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
564 if cc.sc != nil {
565 cc.applyServiceConfigAndBalancer(cc.sc, addrs)
566 return
567 }
568 if cc.dopts.defaultServiceConfig != nil {
569 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
570 } else {
571 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
572 }
573}
574
575func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
576 defer cc.firstResolveEvent.Fire()
577 cc.mu.Lock()
578 // Check if the ClientConn is already closed. Some fields (e.g.
579 // balancerWrapper) are set to nil when closing the ClientConn, and could
580 // cause nil pointer panic if we don't have this check.
581 if cc.conns == nil {
582 cc.mu.Unlock()
583 return nil
584 }
585
586 if err != nil {
587 // May need to apply the initial service config in case the resolver
588 // doesn't support service configs, or doesn't provide a service config
589 // with the new addresses.
590 cc.maybeApplyDefaultServiceConfig(nil)
591
592 if cc.balancerWrapper != nil {
593 cc.balancerWrapper.resolverError(err)
594 }
595
596 // No addresses are valid with err set; return early.
597 cc.mu.Unlock()
598 return balancer.ErrBadResolverState
599 }
600
601 var ret error
602 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
603 cc.maybeApplyDefaultServiceConfig(s.Addresses)
604 // TODO: do we need to apply a failing LB policy if there is no
605 // default, per the error handling design?
606 } else {
607 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
608 cc.applyServiceConfigAndBalancer(sc, s.Addresses)
609 } else {
610 ret = balancer.ErrBadResolverState
611 if cc.balancerWrapper == nil {
612 var err error
613 if s.ServiceConfig.Err != nil {
614 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
615 } else {
616 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
617 }
618 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
619 cc.csMgr.updateState(connectivity.TransientFailure)
620 cc.mu.Unlock()
621 return ret
622 }
623 }
624 }
625
626 var balCfg serviceconfig.LoadBalancingConfig
627 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
628 balCfg = cc.sc.lbConfig.cfg
629 }
630
631 cbn := cc.curBalancerName
632 bw := cc.balancerWrapper
633 cc.mu.Unlock()
634 if cbn != grpclbName {
635 // Filter any grpclb addresses since we don't have the grpclb balancer.
636 for i := 0; i < len(s.Addresses); {
637 if s.Addresses[i].Type == resolver.GRPCLB {
638 copy(s.Addresses[i:], s.Addresses[i+1:])
639 s.Addresses = s.Addresses[:len(s.Addresses)-1]
640 continue
641 }
642 i++
643 }
644 }
645 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
646 if ret == nil {
647 ret = uccsErr // prefer ErrBadResolver state since any other error is
648 // currently meaningless to the caller.
649 }
650 return ret
651}
652
653// switchBalancer starts the switching from current balancer to the balancer
654// with the given name.
655//
656// It will NOT send the current address list to the new balancer. If needed,
657// caller of this function should send address list to the new balancer after
658// this function returns.
659//
660// Caller must hold cc.mu.
661func (cc *ClientConn) switchBalancer(name string) {
662 if strings.EqualFold(cc.curBalancerName, name) {
663 return
664 }
665
Scott Baker105df152020-04-13 15:55:14 -0700666 channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000667 if cc.dopts.balancerBuilder != nil {
Scott Baker105df152020-04-13 15:55:14 -0700668 channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000669 return
670 }
671 if cc.balancerWrapper != nil {
672 cc.balancerWrapper.close()
673 }
674
675 builder := balancer.Get(name)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000676 if builder == nil {
Scott Baker105df152020-04-13 15:55:14 -0700677 channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
678 channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000679 builder = newPickfirstBuilder()
Scott Baker105df152020-04-13 15:55:14 -0700680 } else {
681 channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000682 }
683
684 cc.curBalancerName = builder.Name()
685 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
686}
687
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000688func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000689 cc.mu.Lock()
690 if cc.conns == nil {
691 cc.mu.Unlock()
692 return
693 }
694 // TODO(bar switching) send updates to all balancer wrappers when balancer
695 // gracefully switching is supported.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000696 cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000697 cc.mu.Unlock()
698}
699
700// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
701//
702// Caller needs to make sure len(addrs) > 0.
703func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
704 ac := &addrConn{
Scott Baker105df152020-04-13 15:55:14 -0700705 state: connectivity.Idle,
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000706 cc: cc,
707 addrs: addrs,
708 scopts: opts,
709 dopts: cc.dopts,
710 czData: new(channelzData),
711 resetBackoff: make(chan struct{}),
712 }
713 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
714 // Track ac in cc. This needs to be done before any getTransport(...) is called.
715 cc.mu.Lock()
716 if cc.conns == nil {
717 cc.mu.Unlock()
718 return nil, ErrClientConnClosing
719 }
720 if channelz.IsOn() {
721 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
Scott Baker105df152020-04-13 15:55:14 -0700722 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000723 Desc: "Subchannel Created",
724 Severity: channelz.CtINFO,
725 Parent: &channelz.TraceEventDesc{
726 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
727 Severity: channelz.CtINFO,
728 },
729 })
730 }
731 cc.conns[ac] = struct{}{}
732 cc.mu.Unlock()
733 return ac, nil
734}
735
736// removeAddrConn removes the addrConn in the subConn from clientConn.
737// It also tears down the ac with the given error.
738func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
739 cc.mu.Lock()
740 if cc.conns == nil {
741 cc.mu.Unlock()
742 return
743 }
744 delete(cc.conns, ac)
745 cc.mu.Unlock()
746 ac.tearDown(err)
747}
748
749func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
750 return &channelz.ChannelInternalMetric{
751 State: cc.GetState(),
752 Target: cc.target,
753 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
754 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
755 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
756 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
757 }
758}
759
760// Target returns the target string of the ClientConn.
761// This is an EXPERIMENTAL API.
762func (cc *ClientConn) Target() string {
763 return cc.target
764}
765
766func (cc *ClientConn) incrCallsStarted() {
767 atomic.AddInt64(&cc.czData.callsStarted, 1)
768 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
769}
770
771func (cc *ClientConn) incrCallsSucceeded() {
772 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
773}
774
775func (cc *ClientConn) incrCallsFailed() {
776 atomic.AddInt64(&cc.czData.callsFailed, 1)
777}
778
779// connect starts creating a transport.
780// It does nothing if the ac is not IDLE.
781// TODO(bar) Move this to the addrConn section.
782func (ac *addrConn) connect() error {
783 ac.mu.Lock()
784 if ac.state == connectivity.Shutdown {
785 ac.mu.Unlock()
786 return errConnClosing
787 }
788 if ac.state != connectivity.Idle {
789 ac.mu.Unlock()
790 return nil
791 }
792 // Update connectivity state within the lock to prevent subsequent or
793 // concurrent calls from resetting the transport more than once.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000794 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000795 ac.mu.Unlock()
796
797 // Start a goroutine connecting to the server asynchronously.
798 go ac.resetTransport()
799 return nil
800}
801
802// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
803//
804// If ac is Connecting, it returns false. The caller should tear down the ac and
805// create a new one. Note that the backoff will be reset when this happens.
806//
807// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
808// addresses will be picked up by retry in the next iteration after backoff.
809//
810// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
811//
812// If ac is Ready, it checks whether current connected address of ac is in the
813// new addrs list.
814// - If true, it updates ac.addrs and returns true. The ac will keep using
815// the existing connection.
816// - If false, it does nothing and returns false.
817func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
818 ac.mu.Lock()
819 defer ac.mu.Unlock()
Scott Baker105df152020-04-13 15:55:14 -0700820 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000821 if ac.state == connectivity.Shutdown ||
822 ac.state == connectivity.TransientFailure ||
823 ac.state == connectivity.Idle {
824 ac.addrs = addrs
825 return true
826 }
827
828 if ac.state == connectivity.Connecting {
829 return false
830 }
831
832 // ac.state is Ready, try to find the connected address.
833 var curAddrFound bool
834 for _, a := range addrs {
835 if reflect.DeepEqual(ac.curAddr, a) {
836 curAddrFound = true
837 break
838 }
839 }
Scott Baker105df152020-04-13 15:55:14 -0700840 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000841 if curAddrFound {
842 ac.addrs = addrs
843 }
844
845 return curAddrFound
846}
847
848// GetMethodConfig gets the method config of the input method.
849// If there's an exact match for input method (i.e. /service/method), we return
850// the corresponding MethodConfig.
851// If there isn't an exact match for the input method, we look for the default config
852// under the service (i.e /service/). If there is a default MethodConfig for
853// the service, we return it.
854// Otherwise, we return an empty MethodConfig.
855func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
856 // TODO: Avoid the locking here.
857 cc.mu.RLock()
858 defer cc.mu.RUnlock()
859 if cc.sc == nil {
860 return MethodConfig{}
861 }
862 m, ok := cc.sc.Methods[method]
863 if !ok {
864 i := strings.LastIndex(method, "/")
865 m = cc.sc.Methods[method[:i+1]]
866 }
867 return m
868}
869
870func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
871 cc.mu.RLock()
872 defer cc.mu.RUnlock()
873 if cc.sc == nil {
874 return nil
875 }
876 return cc.sc.healthCheckConfig
877}
878
879func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000880 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
881 Ctx: ctx,
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000882 FullMethodName: method,
883 })
884 if err != nil {
885 return nil, nil, toRPCErr(err)
886 }
887 return t, done, nil
888}
889
890func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
891 if sc == nil {
892 // should never reach here.
893 return
894 }
895 cc.sc = sc
896
897 if cc.sc.retryThrottling != nil {
898 newThrottler := &retryThrottler{
899 tokens: cc.sc.retryThrottling.MaxTokens,
900 max: cc.sc.retryThrottling.MaxTokens,
901 thresh: cc.sc.retryThrottling.MaxTokens / 2,
902 ratio: cc.sc.retryThrottling.TokenRatio,
903 }
904 cc.retryThrottler.Store(newThrottler)
905 } else {
906 cc.retryThrottler.Store((*retryThrottler)(nil))
907 }
908
909 if cc.dopts.balancerBuilder == nil {
910 // Only look at balancer types and switch balancer if balancer dial
911 // option is not set.
912 var newBalancerName string
913 if cc.sc != nil && cc.sc.lbConfig != nil {
914 newBalancerName = cc.sc.lbConfig.name
915 } else {
916 var isGRPCLB bool
917 for _, a := range addrs {
918 if a.Type == resolver.GRPCLB {
919 isGRPCLB = true
920 break
921 }
922 }
923 if isGRPCLB {
924 newBalancerName = grpclbName
925 } else if cc.sc != nil && cc.sc.LB != nil {
926 newBalancerName = *cc.sc.LB
927 } else {
928 newBalancerName = PickFirstBalancerName
929 }
930 }
931 cc.switchBalancer(newBalancerName)
932 } else if cc.balancerWrapper == nil {
933 // Balancer dial option was set, and this is the first time handling
934 // resolved addresses. Build a balancer with dopts.balancerBuilder.
935 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
936 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
937 }
938}
939
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000940func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000941 cc.mu.RLock()
942 r := cc.resolverWrapper
943 cc.mu.RUnlock()
944 if r == nil {
945 return
946 }
947 go r.resolveNow(o)
948}
949
950// ResetConnectBackoff wakes up all subchannels in transient failure and causes
951// them to attempt another connection immediately. It also resets the backoff
952// times used for subsequent attempts regardless of the current state.
953//
954// In general, this function should not be used. Typical service or network
955// outages result in a reasonable client reconnection strategy by default.
956// However, if a previously unavailable network becomes available, this may be
957// used to trigger an immediate reconnect.
958//
959// This API is EXPERIMENTAL.
960func (cc *ClientConn) ResetConnectBackoff() {
961 cc.mu.Lock()
962 conns := cc.conns
963 cc.mu.Unlock()
964 for ac := range conns {
965 ac.resetConnectBackoff()
966 }
967}
968
969// Close tears down the ClientConn and all underlying connections.
970func (cc *ClientConn) Close() error {
971 defer cc.cancel()
972
973 cc.mu.Lock()
974 if cc.conns == nil {
975 cc.mu.Unlock()
976 return ErrClientConnClosing
977 }
978 conns := cc.conns
979 cc.conns = nil
980 cc.csMgr.updateState(connectivity.Shutdown)
981
982 rWrapper := cc.resolverWrapper
983 cc.resolverWrapper = nil
984 bWrapper := cc.balancerWrapper
985 cc.balancerWrapper = nil
986 cc.mu.Unlock()
987
988 cc.blockingpicker.close()
989
990 if rWrapper != nil {
991 rWrapper.close()
992 }
993 if bWrapper != nil {
994 bWrapper.close()
995 }
996
997 for ac := range conns {
998 ac.tearDown(ErrClientConnClosing)
999 }
1000 if channelz.IsOn() {
1001 ted := &channelz.TraceEventDesc{
1002 Desc: "Channel Deleted",
1003 Severity: channelz.CtINFO,
1004 }
1005 if cc.dopts.channelzParentID != 0 {
1006 ted.Parent = &channelz.TraceEventDesc{
1007 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1008 Severity: channelz.CtINFO,
1009 }
1010 }
Scott Baker105df152020-04-13 15:55:14 -07001011 channelz.AddTraceEvent(cc.channelzID, 0, ted)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001012 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1013 // the entity being deleted, and thus prevent it from being deleted right away.
1014 channelz.RemoveEntry(cc.channelzID)
1015 }
1016 return nil
1017}
1018
1019// addrConn is a network connection to a given address.
1020type addrConn struct {
1021 ctx context.Context
1022 cancel context.CancelFunc
1023
1024 cc *ClientConn
1025 dopts dialOptions
1026 acbw balancer.SubConn
1027 scopts balancer.NewSubConnOptions
1028
1029 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1030 // health checking may require server to report healthy to set ac to READY), and is reset
1031 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1032 // is received, transport is closed, ac has been torn down).
1033 transport transport.ClientTransport // The current transport.
1034
1035 mu sync.Mutex
1036 curAddr resolver.Address // The current address.
1037 addrs []resolver.Address // All addresses that the resolver resolved to.
1038
1039 // Use updateConnectivityState for updating addrConn's connectivity state.
1040 state connectivity.State
1041
1042 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1043 resetBackoff chan struct{}
1044
1045 channelzID int64 // channelz unique identification number.
1046 czData *channelzData
1047}
1048
1049// Note: this requires a lock on ac.mu.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001050func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001051 if ac.state == s {
1052 return
1053 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001054 ac.state = s
Scott Baker105df152020-04-13 15:55:14 -07001055 channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001056 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001057}
1058
1059// adjustParams updates parameters used to create transports upon
1060// receiving a GoAway.
1061func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1062 switch r {
1063 case transport.GoAwayTooManyPings:
1064 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1065 ac.cc.mu.Lock()
1066 if v > ac.cc.mkp.Time {
1067 ac.cc.mkp.Time = v
1068 }
1069 ac.cc.mu.Unlock()
1070 }
1071}
1072
1073func (ac *addrConn) resetTransport() {
1074 for i := 0; ; i++ {
1075 if i > 0 {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001076 ac.cc.resolveNow(resolver.ResolveNowOptions{})
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001077 }
1078
1079 ac.mu.Lock()
1080 if ac.state == connectivity.Shutdown {
1081 ac.mu.Unlock()
1082 return
1083 }
1084
1085 addrs := ac.addrs
1086 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1087 // This will be the duration that dial gets to finish.
1088 dialDuration := minConnectTimeout
1089 if ac.dopts.minConnectTimeout != nil {
1090 dialDuration = ac.dopts.minConnectTimeout()
1091 }
1092
1093 if dialDuration < backoffFor {
1094 // Give dial more time as we keep failing to connect.
1095 dialDuration = backoffFor
1096 }
1097 // We can potentially spend all the time trying the first address, and
1098 // if the server accepts the connection and then hangs, the following
1099 // addresses will never be tried.
1100 //
1101 // The spec doesn't mention what should be done for multiple addresses.
1102 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1103 connectDeadline := time.Now().Add(dialDuration)
1104
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001105 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001106 ac.transport = nil
1107 ac.mu.Unlock()
1108
1109 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1110 if err != nil {
1111 // After exhausting all addresses, the addrConn enters
1112 // TRANSIENT_FAILURE.
1113 ac.mu.Lock()
1114 if ac.state == connectivity.Shutdown {
1115 ac.mu.Unlock()
1116 return
1117 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001118 ac.updateConnectivityState(connectivity.TransientFailure, err)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001119
1120 // Backoff.
1121 b := ac.resetBackoff
1122 ac.mu.Unlock()
1123
1124 timer := time.NewTimer(backoffFor)
1125 select {
1126 case <-timer.C:
1127 ac.mu.Lock()
1128 ac.backoffIdx++
1129 ac.mu.Unlock()
1130 case <-b:
1131 timer.Stop()
1132 case <-ac.ctx.Done():
1133 timer.Stop()
1134 return
1135 }
1136 continue
1137 }
1138
1139 ac.mu.Lock()
1140 if ac.state == connectivity.Shutdown {
1141 ac.mu.Unlock()
1142 newTr.Close()
1143 return
1144 }
1145 ac.curAddr = addr
1146 ac.transport = newTr
1147 ac.backoffIdx = 0
1148
1149 hctx, hcancel := context.WithCancel(ac.ctx)
1150 ac.startHealthCheck(hctx)
1151 ac.mu.Unlock()
1152
1153 // Block until the created transport is down. And when this happens,
1154 // we restart from the top of the addr list.
1155 <-reconnect.Done()
1156 hcancel()
1157 // restart connecting - the top of the loop will set state to
1158 // CONNECTING. This is against the current connectivity semantics doc,
1159 // however it allows for graceful behavior for RPCs not yet dispatched
1160 // - unfortunate timing would otherwise lead to the RPC failing even
1161 // though the TRANSIENT_FAILURE state (called for by the doc) would be
1162 // instantaneous.
1163 //
1164 // Ideally we should transition to Idle here and block until there is
1165 // RPC activity that leads to the balancer requesting a reconnect of
1166 // the associated SubConn.
1167 }
1168}
1169
1170// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1171// first successful one. It returns the transport, the address and a Event in
1172// the successful case. The Event fires when the returned transport disconnects.
1173func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001174 var firstConnErr error
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001175 for _, addr := range addrs {
1176 ac.mu.Lock()
1177 if ac.state == connectivity.Shutdown {
1178 ac.mu.Unlock()
1179 return nil, resolver.Address{}, nil, errConnClosing
1180 }
1181
1182 ac.cc.mu.RLock()
1183 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1184 ac.cc.mu.RUnlock()
1185
1186 copts := ac.dopts.copts
1187 if ac.scopts.CredsBundle != nil {
1188 copts.CredsBundle = ac.scopts.CredsBundle
1189 }
1190 ac.mu.Unlock()
1191
Scott Baker105df152020-04-13 15:55:14 -07001192 channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001193
1194 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1195 if err == nil {
1196 return newTr, addr, reconnect, nil
1197 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001198 if firstConnErr == nil {
1199 firstConnErr = err
1200 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001201 ac.cc.blockingpicker.updateConnectionError(err)
1202 }
1203
1204 // Couldn't connect to any address.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001205 return nil, resolver.Address{}, nil, firstConnErr
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001206}
1207
1208// createTransport creates a connection to addr. It returns the transport and a
1209// Event in the successful case. The Event fires when the returned transport
1210// disconnects.
1211func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1212 prefaceReceived := make(chan struct{})
1213 onCloseCalled := make(chan struct{})
1214 reconnect := grpcsync.NewEvent()
1215
1216 authority := ac.cc.authority
1217 // addr.ServerName takes precedent over ClientConn authority, if present.
1218 if addr.ServerName != "" {
1219 authority = addr.ServerName
1220 }
1221
1222 target := transport.TargetInfo{
1223 Addr: addr.Addr,
1224 Metadata: addr.Metadata,
1225 Authority: authority,
1226 }
1227
1228 once := sync.Once{}
1229 onGoAway := func(r transport.GoAwayReason) {
1230 ac.mu.Lock()
1231 ac.adjustParams(r)
1232 once.Do(func() {
1233 if ac.state == connectivity.Ready {
1234 // Prevent this SubConn from being used for new RPCs by setting its
1235 // state to Connecting.
1236 //
1237 // TODO: this should be Idle when grpc-go properly supports it.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001238 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001239 }
1240 })
1241 ac.mu.Unlock()
1242 reconnect.Fire()
1243 }
1244
1245 onClose := func() {
1246 ac.mu.Lock()
1247 once.Do(func() {
1248 if ac.state == connectivity.Ready {
1249 // Prevent this SubConn from being used for new RPCs by setting its
1250 // state to Connecting.
1251 //
1252 // TODO: this should be Idle when grpc-go properly supports it.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001253 ac.updateConnectivityState(connectivity.Connecting, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001254 }
1255 })
1256 ac.mu.Unlock()
1257 close(onCloseCalled)
1258 reconnect.Fire()
1259 }
1260
1261 onPrefaceReceipt := func() {
1262 close(prefaceReceived)
1263 }
1264
1265 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1266 defer cancel()
1267 if channelz.IsOn() {
1268 copts.ChannelzParentID = ac.channelzID
1269 }
1270
1271 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1272 if err != nil {
1273 // newTr is either nil, or closed.
Scott Baker105df152020-04-13 15:55:14 -07001274 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001275 return nil, nil, err
1276 }
1277
1278 select {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001279 case <-time.After(time.Until(connectDeadline)):
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001280 // We didn't get the preface in time.
1281 newTr.Close()
Scott Baker105df152020-04-13 15:55:14 -07001282 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001283 return nil, nil, errors.New("timed out waiting for server handshake")
1284 case <-prefaceReceived:
1285 // We got the preface - huzzah! things are good.
1286 case <-onCloseCalled:
1287 // The transport has already closed - noop.
1288 return nil, nil, errors.New("connection closed")
1289 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1290 }
1291 return newTr, reconnect, nil
1292}
1293
1294// startHealthCheck starts the health checking stream (RPC) to watch the health
1295// stats of this connection if health checking is requested and configured.
1296//
1297// LB channel health checking is enabled when all requirements below are met:
1298// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1299// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1300// 3. a service config with non-empty healthCheckConfig field is provided
1301// 4. the load balancer requests it
1302//
1303// It sets addrConn to READY if the health checking stream is not started.
1304//
1305// Caller must hold ac.mu.
1306func (ac *addrConn) startHealthCheck(ctx context.Context) {
1307 var healthcheckManagingState bool
1308 defer func() {
1309 if !healthcheckManagingState {
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001310 ac.updateConnectivityState(connectivity.Ready, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001311 }
1312 }()
1313
1314 if ac.cc.dopts.disableHealthCheck {
1315 return
1316 }
1317 healthCheckConfig := ac.cc.healthCheckConfig()
1318 if healthCheckConfig == nil {
1319 return
1320 }
1321 if !ac.scopts.HealthCheckEnabled {
1322 return
1323 }
1324 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1325 if healthCheckFunc == nil {
1326 // The health package is not imported to set health check function.
1327 //
1328 // TODO: add a link to the health check doc in the error message.
Scott Baker105df152020-04-13 15:55:14 -07001329 channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001330 return
1331 }
1332
1333 healthcheckManagingState = true
1334
1335 // Set up the health check helper functions.
1336 currentTr := ac.transport
1337 newStream := func(method string) (interface{}, error) {
1338 ac.mu.Lock()
1339 if ac.transport != currentTr {
1340 ac.mu.Unlock()
1341 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1342 }
1343 ac.mu.Unlock()
1344 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1345 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001346 setConnectivityState := func(s connectivity.State, lastErr error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001347 ac.mu.Lock()
1348 defer ac.mu.Unlock()
1349 if ac.transport != currentTr {
1350 return
1351 }
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001352 ac.updateConnectivityState(s, lastErr)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001353 }
1354 // Start the health checking stream.
1355 go func() {
1356 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1357 if err != nil {
1358 if status.Code(err) == codes.Unimplemented {
Scott Baker105df152020-04-13 15:55:14 -07001359 channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001360 } else {
Scott Baker105df152020-04-13 15:55:14 -07001361 channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001362 }
1363 }
1364 }()
1365}
1366
1367func (ac *addrConn) resetConnectBackoff() {
1368 ac.mu.Lock()
1369 close(ac.resetBackoff)
1370 ac.backoffIdx = 0
1371 ac.resetBackoff = make(chan struct{})
1372 ac.mu.Unlock()
1373}
1374
1375// getReadyTransport returns the transport if ac's state is READY.
1376// Otherwise it returns nil, false.
1377// If ac's state is IDLE, it will trigger ac to connect.
1378func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1379 ac.mu.Lock()
1380 if ac.state == connectivity.Ready && ac.transport != nil {
1381 t := ac.transport
1382 ac.mu.Unlock()
1383 return t, true
1384 }
1385 var idle bool
1386 if ac.state == connectivity.Idle {
1387 idle = true
1388 }
1389 ac.mu.Unlock()
1390 // Trigger idle ac to connect.
1391 if idle {
1392 ac.connect()
1393 }
1394 return nil, false
1395}
1396
1397// tearDown starts to tear down the addrConn.
1398// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1399// some edge cases (e.g., the caller opens and closes many addrConn's in a
1400// tight loop.
1401// tearDown doesn't remove ac from ac.cc.conns.
1402func (ac *addrConn) tearDown(err error) {
1403 ac.mu.Lock()
1404 if ac.state == connectivity.Shutdown {
1405 ac.mu.Unlock()
1406 return
1407 }
1408 curTr := ac.transport
1409 ac.transport = nil
1410 // We have to set the state to Shutdown before anything else to prevent races
1411 // between setting the state and logic that waits on context cancellation / etc.
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001412 ac.updateConnectivityState(connectivity.Shutdown, nil)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001413 ac.cancel()
1414 ac.curAddr = resolver.Address{}
1415 if err == errConnDrain && curTr != nil {
1416 // GracefulClose(...) may be executed multiple times when
1417 // i) receiving multiple GoAway frames from the server; or
1418 // ii) there are concurrent name resolver/Balancer triggered
1419 // address removal and GoAway.
1420 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1421 ac.mu.Unlock()
1422 curTr.GracefulClose()
1423 ac.mu.Lock()
1424 }
1425 if channelz.IsOn() {
Scott Baker105df152020-04-13 15:55:14 -07001426 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001427 Desc: "Subchannel Deleted",
1428 Severity: channelz.CtINFO,
1429 Parent: &channelz.TraceEventDesc{
1430 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1431 Severity: channelz.CtINFO,
1432 },
1433 })
1434 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1435 // the entity being deleted, and thus prevent it from being deleted right away.
1436 channelz.RemoveEntry(ac.channelzID)
1437 }
1438 ac.mu.Unlock()
1439}
1440
1441func (ac *addrConn) getState() connectivity.State {
1442 ac.mu.Lock()
1443 defer ac.mu.Unlock()
1444 return ac.state
1445}
1446
1447func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1448 ac.mu.Lock()
1449 addr := ac.curAddr.Addr
1450 ac.mu.Unlock()
1451 return &channelz.ChannelInternalMetric{
1452 State: ac.getState(),
1453 Target: addr,
1454 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1455 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1456 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1457 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1458 }
1459}
1460
1461func (ac *addrConn) incrCallsStarted() {
1462 atomic.AddInt64(&ac.czData.callsStarted, 1)
1463 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1464}
1465
1466func (ac *addrConn) incrCallsSucceeded() {
1467 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1468}
1469
1470func (ac *addrConn) incrCallsFailed() {
1471 atomic.AddInt64(&ac.czData.callsFailed, 1)
1472}
1473
1474type retryThrottler struct {
1475 max float64
1476 thresh float64
1477 ratio float64
1478
1479 mu sync.Mutex
1480 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1481}
1482
1483// throttle subtracts a retry token from the pool and returns whether a retry
1484// should be throttled (disallowed) based upon the retry throttling policy in
1485// the service config.
1486func (rt *retryThrottler) throttle() bool {
1487 if rt == nil {
1488 return false
1489 }
1490 rt.mu.Lock()
1491 defer rt.mu.Unlock()
1492 rt.tokens--
1493 if rt.tokens < 0 {
1494 rt.tokens = 0
1495 }
1496 return rt.tokens <= rt.thresh
1497}
1498
1499func (rt *retryThrottler) successfulRPC() {
1500 if rt == nil {
1501 return
1502 }
1503 rt.mu.Lock()
1504 defer rt.mu.Unlock()
1505 rt.tokens += rt.ratio
1506 if rt.tokens > rt.max {
1507 rt.tokens = rt.max
1508 }
1509}
1510
1511type channelzChannel struct {
1512 cc *ClientConn
1513}
1514
1515func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1516 return c.cc.channelzMetric()
1517}
1518
1519// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1520// underlying connections within the specified timeout.
1521//
1522// Deprecated: This error is never returned by grpc and should not be
1523// referenced by users.
1524var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
Dinesh Belwalkar396b6522020-02-06 22:11:53 +00001525
1526func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1527 for _, rb := range cc.dopts.resolvers {
1528 if cc.parsedTarget.Scheme == rb.Scheme() {
1529 return rb
1530 }
1531 }
1532 return resolver.Get(cc.parsedTarget.Scheme)
1533}