blob: 0740693b75bc1995c803363d26c2e4a3e2505fac [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/balancer/base"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/internal/backoff"
39 "google.golang.org/grpc/internal/channelz"
40 "google.golang.org/grpc/internal/grpcsync"
41 "google.golang.org/grpc/internal/grpcutil"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/serviceconfig"
46 "google.golang.org/grpc/status"
47
48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54 // minimum time to give a connection to complete
55 minConnectTimeout = 20 * time.Second
56 // must match grpclbName in grpclb/grpclb.go
57 grpclbName = "grpclb"
58)
59
60var (
61 // ErrClientConnClosing indicates that the operation is illegal because
62 // the ClientConn is closing.
63 //
64 // Deprecated: this error should not be relied upon by users; use the status
65 // code of Canceled instead.
66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68 errConnDrain = errors.New("grpc: the connection is drained")
69 // errConnClosing indicates that the connection is closing.
70 errConnClosing = errors.New("grpc: the connection is closing")
71 // errBalancerClosed indicates that the balancer is closed.
72 errBalancerClosed = errors.New("grpc: balancer is closed")
73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74 // service config.
75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80 // errNoTransportSecurity indicates that there is no transport security
81 // being set for ClientConn. Users should either set one or explicitly
82 // call WithInsecure DialOption to disable security.
83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84 // errTransportCredsAndBundle indicates that creds bundle is used together
85 // with other individual Transport Credentials.
86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87 // errTransportCredentialsMissing indicates that users want to transmit security
88 // information (e.g., OAuth2 token) which requires secure connection on an insecure
89 // connection.
90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
92 // and grpc.WithInsecure() are both called for a connection.
93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98 defaultClientMaxSendMessageSize = math.MaxInt32
99 // http2IOBufSize specifies the buffer size for sending frames.
100 defaultWriteBufSize = 32 * 1024
101 defaultReadBufSize = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106 return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126 cc := &ClientConn{
127 target: target,
128 csMgr: &connectivityStateManager{},
129 conns: make(map[*addrConn]struct{}),
130 dopts: defaultDialOptions(),
131 blockingpicker: newPickerWrapper(),
132 czData: new(channelzData),
133 firstResolveEvent: grpcsync.NewEvent(),
134 }
135 cc.retryThrottler.Store((*retryThrottler)(nil))
136 cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138 for _, opt := range opts {
139 opt.apply(&cc.dopts)
140 }
141
142 chainUnaryClientInterceptors(cc)
143 chainStreamClientInterceptors(cc)
144
145 defer func() {
146 if err != nil {
147 cc.Close()
148 }
149 }()
150
151 if channelz.IsOn() {
152 if cc.dopts.channelzParentID != 0 {
153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154 channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
155 Desc: "Channel Created",
156 Severity: channelz.CtINFO,
157 Parent: &channelz.TraceEventDesc{
158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159 Severity: channelz.CtINFO,
160 },
161 })
162 } else {
163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164 channelz.Info(cc.channelzID, "Channel Created")
165 }
166 cc.csMgr.channelzID = cc.channelzID
167 }
168
169 if !cc.dopts.insecure {
170 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
171 return nil, errNoTransportSecurity
172 }
173 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
174 return nil, errTransportCredsAndBundle
175 }
176 } else {
177 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
178 return nil, errCredentialsConflict
179 }
180 for _, cd := range cc.dopts.copts.PerRPCCredentials {
181 if cd.RequireTransportSecurity() {
182 return nil, errTransportCredentialsMissing
183 }
184 }
185 }
186
187 if cc.dopts.defaultServiceConfigRawJSON != nil {
188 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
189 if scpr.Err != nil {
190 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
191 }
192 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
193 }
194 cc.mkp = cc.dopts.copts.KeepaliveParams
195
196 if cc.dopts.copts.Dialer == nil {
197 cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
198 network, addr := parseDialTarget(addr)
199 return (&net.Dialer{}).DialContext(ctx, network, addr)
200 }
201 if cc.dopts.withProxy {
202 cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
203 }
204 }
205
206 if cc.dopts.copts.UserAgent != "" {
207 cc.dopts.copts.UserAgent += " " + grpcUA
208 } else {
209 cc.dopts.copts.UserAgent = grpcUA
210 }
211
212 if cc.dopts.timeout > 0 {
213 var cancel context.CancelFunc
214 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
215 defer cancel()
216 }
217 defer func() {
218 select {
219 case <-ctx.Done():
220 conn, err = nil, ctx.Err()
221 default:
222 }
223 }()
224
225 scSet := false
226 if cc.dopts.scChan != nil {
227 // Try to get an initial service config.
228 select {
229 case sc, ok := <-cc.dopts.scChan:
230 if ok {
231 cc.sc = &sc
232 scSet = true
233 }
234 default:
235 }
236 }
237 if cc.dopts.bs == nil {
238 cc.dopts.bs = backoff.DefaultExponential
239 }
240
241 // Determine the resolver to use.
242 cc.parsedTarget = grpcutil.ParseTarget(cc.target)
243 channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
244 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
245 if resolverBuilder == nil {
246 // If resolver builder is still nil, the parsed target's scheme is
247 // not registered. Fallback to default resolver and set Endpoint to
248 // the original target.
249 channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
250 cc.parsedTarget = resolver.Target{
251 Scheme: resolver.GetDefaultScheme(),
252 Endpoint: target,
253 }
254 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
255 if resolverBuilder == nil {
256 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
257 }
258 }
259
260 creds := cc.dopts.copts.TransportCredentials
261 if creds != nil && creds.Info().ServerName != "" {
262 cc.authority = creds.Info().ServerName
263 } else if cc.dopts.insecure && cc.dopts.authority != "" {
264 cc.authority = cc.dopts.authority
265 } else {
266 // Use endpoint from "scheme://authority/endpoint" as the default
267 // authority for ClientConn.
268 cc.authority = cc.parsedTarget.Endpoint
269 }
270
271 if cc.dopts.scChan != nil && !scSet {
272 // Blocking wait for the initial service config.
273 select {
274 case sc, ok := <-cc.dopts.scChan:
275 if ok {
276 cc.sc = &sc
277 }
278 case <-ctx.Done():
279 return nil, ctx.Err()
280 }
281 }
282 if cc.dopts.scChan != nil {
283 go cc.scWatcher()
284 }
285
286 var credsClone credentials.TransportCredentials
287 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
288 credsClone = creds.Clone()
289 }
290 cc.balancerBuildOpts = balancer.BuildOptions{
291 DialCreds: credsClone,
292 CredsBundle: cc.dopts.copts.CredsBundle,
293 Dialer: cc.dopts.copts.Dialer,
294 ChannelzParentID: cc.channelzID,
295 Target: cc.parsedTarget,
296 }
297
298 // Build the resolver.
299 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
300 if err != nil {
301 return nil, fmt.Errorf("failed to build resolver: %v", err)
302 }
303 cc.mu.Lock()
304 cc.resolverWrapper = rWrapper
305 cc.mu.Unlock()
306
307 // A blocking dial blocks until the clientConn is ready.
308 if cc.dopts.block {
309 for {
310 s := cc.GetState()
311 if s == connectivity.Ready {
312 break
313 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
314 if err = cc.blockingpicker.connectionError(); err != nil {
315 terr, ok := err.(interface {
316 Temporary() bool
317 })
318 if ok && !terr.Temporary() {
319 return nil, err
320 }
321 }
322 }
323 if !cc.WaitForStateChange(ctx, s) {
324 // ctx got timeout or canceled.
325 return nil, ctx.Err()
326 }
327 }
328 }
329
330 return cc, nil
331}
332
333// chainUnaryClientInterceptors chains all unary client interceptors into one.
334func chainUnaryClientInterceptors(cc *ClientConn) {
335 interceptors := cc.dopts.chainUnaryInts
336 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
337 // be executed before any other chained interceptors.
338 if cc.dopts.unaryInt != nil {
339 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
340 }
341 var chainedInt UnaryClientInterceptor
342 if len(interceptors) == 0 {
343 chainedInt = nil
344 } else if len(interceptors) == 1 {
345 chainedInt = interceptors[0]
346 } else {
347 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
348 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
349 }
350 }
351 cc.dopts.unaryInt = chainedInt
352}
353
354// getChainUnaryInvoker recursively generate the chained unary invoker.
355func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
356 if curr == len(interceptors)-1 {
357 return finalInvoker
358 }
359 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
360 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
361 }
362}
363
364// chainStreamClientInterceptors chains all stream client interceptors into one.
365func chainStreamClientInterceptors(cc *ClientConn) {
366 interceptors := cc.dopts.chainStreamInts
367 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
368 // be executed before any other chained interceptors.
369 if cc.dopts.streamInt != nil {
370 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
371 }
372 var chainedInt StreamClientInterceptor
373 if len(interceptors) == 0 {
374 chainedInt = nil
375 } else if len(interceptors) == 1 {
376 chainedInt = interceptors[0]
377 } else {
378 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
379 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
380 }
381 }
382 cc.dopts.streamInt = chainedInt
383}
384
385// getChainStreamer recursively generate the chained client stream constructor.
386func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
387 if curr == len(interceptors)-1 {
388 return finalStreamer
389 }
390 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
391 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
392 }
393}
394
395// connectivityStateManager keeps the connectivity.State of ClientConn.
396// This struct will eventually be exported so the balancers can access it.
397type connectivityStateManager struct {
398 mu sync.Mutex
399 state connectivity.State
400 notifyChan chan struct{}
401 channelzID int64
402}
403
404// updateState updates the connectivity.State of ClientConn.
405// If there's a change it notifies goroutines waiting on state change to
406// happen.
407func (csm *connectivityStateManager) updateState(state connectivity.State) {
408 csm.mu.Lock()
409 defer csm.mu.Unlock()
410 if csm.state == connectivity.Shutdown {
411 return
412 }
413 if csm.state == state {
414 return
415 }
416 csm.state = state
417 channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)
418 if csm.notifyChan != nil {
419 // There are other goroutines waiting on this channel.
420 close(csm.notifyChan)
421 csm.notifyChan = nil
422 }
423}
424
425func (csm *connectivityStateManager) getState() connectivity.State {
426 csm.mu.Lock()
427 defer csm.mu.Unlock()
428 return csm.state
429}
430
431func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
432 csm.mu.Lock()
433 defer csm.mu.Unlock()
434 if csm.notifyChan == nil {
435 csm.notifyChan = make(chan struct{})
436 }
437 return csm.notifyChan
438}
439
440// ClientConnInterface defines the functions clients need to perform unary and
441// streaming RPCs. It is implemented by *ClientConn, and is only intended to
442// be referenced by generated code.
443type ClientConnInterface interface {
444 // Invoke performs a unary RPC and returns after the response is received
445 // into reply.
446 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
447 // NewStream begins a streaming RPC.
448 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
449}
450
451// Assert *ClientConn implements ClientConnInterface.
452var _ ClientConnInterface = (*ClientConn)(nil)
453
454// ClientConn represents a virtual connection to a conceptual endpoint, to
455// perform RPCs.
456//
457// A ClientConn is free to have zero or more actual connections to the endpoint
458// based on configuration, load, etc. It is also free to determine which actual
459// endpoints to use and may change it every RPC, permitting client-side load
460// balancing.
461//
462// A ClientConn encapsulates a range of functionality including name
463// resolution, TCP connection establishment (with retries and backoff) and TLS
464// handshakes. It also handles errors on established connections by
465// re-resolving the name and reconnecting.
466type ClientConn struct {
467 ctx context.Context
468 cancel context.CancelFunc
469
470 target string
471 parsedTarget resolver.Target
472 authority string
473 dopts dialOptions
474 csMgr *connectivityStateManager
475
476 balancerBuildOpts balancer.BuildOptions
477 blockingpicker *pickerWrapper
478
479 mu sync.RWMutex
480 resolverWrapper *ccResolverWrapper
481 sc *ServiceConfig
482 conns map[*addrConn]struct{}
483 // Keepalive parameter can be updated if a GoAway is received.
484 mkp keepalive.ClientParameters
485 curBalancerName string
486 balancerWrapper *ccBalancerWrapper
487 retryThrottler atomic.Value
488
489 firstResolveEvent *grpcsync.Event
490
491 channelzID int64 // channelz unique identification number
492 czData *channelzData
493}
494
495// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
496// ctx expires. A true value is returned in former case and false in latter.
497// This is an EXPERIMENTAL API.
498func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
499 ch := cc.csMgr.getNotifyChan()
500 if cc.csMgr.getState() != sourceState {
501 return true
502 }
503 select {
504 case <-ctx.Done():
505 return false
506 case <-ch:
507 return true
508 }
509}
510
511// GetState returns the connectivity.State of ClientConn.
512// This is an EXPERIMENTAL API.
513func (cc *ClientConn) GetState() connectivity.State {
514 return cc.csMgr.getState()
515}
516
517func (cc *ClientConn) scWatcher() {
518 for {
519 select {
520 case sc, ok := <-cc.dopts.scChan:
521 if !ok {
522 return
523 }
524 cc.mu.Lock()
525 // TODO: load balance policy runtime change is ignored.
526 // We may revisit this decision in the future.
527 cc.sc = &sc
528 cc.mu.Unlock()
529 case <-cc.ctx.Done():
530 return
531 }
532 }
533}
534
535// waitForResolvedAddrs blocks until the resolver has provided addresses or the
536// context expires. Returns nil unless the context expires first; otherwise
537// returns a status error based on the context.
538func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
539 // This is on the RPC path, so we use a fast path to avoid the
540 // more-expensive "select" below after the resolver has returned once.
541 if cc.firstResolveEvent.HasFired() {
542 return nil
543 }
544 select {
545 case <-cc.firstResolveEvent.Done():
546 return nil
547 case <-ctx.Done():
548 return status.FromContextError(ctx.Err()).Err()
549 case <-cc.ctx.Done():
550 return ErrClientConnClosing
551 }
552}
553
554var emptyServiceConfig *ServiceConfig
555
556func init() {
557 cfg := parseServiceConfig("{}")
558 if cfg.Err != nil {
559 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
560 }
561 emptyServiceConfig = cfg.Config.(*ServiceConfig)
562}
563
564func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
565 if cc.sc != nil {
566 cc.applyServiceConfigAndBalancer(cc.sc, addrs)
567 return
568 }
569 if cc.dopts.defaultServiceConfig != nil {
570 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
571 } else {
572 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
573 }
574}
575
576func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
577 defer cc.firstResolveEvent.Fire()
578 cc.mu.Lock()
579 // Check if the ClientConn is already closed. Some fields (e.g.
580 // balancerWrapper) are set to nil when closing the ClientConn, and could
581 // cause nil pointer panic if we don't have this check.
582 if cc.conns == nil {
583 cc.mu.Unlock()
584 return nil
585 }
586
587 if err != nil {
588 // May need to apply the initial service config in case the resolver
589 // doesn't support service configs, or doesn't provide a service config
590 // with the new addresses.
591 cc.maybeApplyDefaultServiceConfig(nil)
592
593 if cc.balancerWrapper != nil {
594 cc.balancerWrapper.resolverError(err)
595 }
596
597 // No addresses are valid with err set; return early.
598 cc.mu.Unlock()
599 return balancer.ErrBadResolverState
600 }
601
602 var ret error
603 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
604 cc.maybeApplyDefaultServiceConfig(s.Addresses)
605 // TODO: do we need to apply a failing LB policy if there is no
606 // default, per the error handling design?
607 } else {
608 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
609 cc.applyServiceConfigAndBalancer(sc, s.Addresses)
610 } else {
611 ret = balancer.ErrBadResolverState
612 if cc.balancerWrapper == nil {
613 var err error
614 if s.ServiceConfig.Err != nil {
615 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
616 } else {
617 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
618 }
619 cc.blockingpicker.updatePicker(base.NewErrPicker(err))
620 cc.csMgr.updateState(connectivity.TransientFailure)
621 cc.mu.Unlock()
622 return ret
623 }
624 }
625 }
626
627 var balCfg serviceconfig.LoadBalancingConfig
628 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
629 balCfg = cc.sc.lbConfig.cfg
630 }
631
632 cbn := cc.curBalancerName
633 bw := cc.balancerWrapper
634 cc.mu.Unlock()
635 if cbn != grpclbName {
636 // Filter any grpclb addresses since we don't have the grpclb balancer.
637 for i := 0; i < len(s.Addresses); {
638 if s.Addresses[i].Type == resolver.GRPCLB {
639 copy(s.Addresses[i:], s.Addresses[i+1:])
640 s.Addresses = s.Addresses[:len(s.Addresses)-1]
641 continue
642 }
643 i++
644 }
645 }
646 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
647 if ret == nil {
648 ret = uccsErr // prefer ErrBadResolver state since any other error is
649 // currently meaningless to the caller.
650 }
651 return ret
652}
653
654// switchBalancer starts the switching from current balancer to the balancer
655// with the given name.
656//
657// It will NOT send the current address list to the new balancer. If needed,
658// caller of this function should send address list to the new balancer after
659// this function returns.
660//
661// Caller must hold cc.mu.
662func (cc *ClientConn) switchBalancer(name string) {
663 if strings.EqualFold(cc.curBalancerName, name) {
664 return
665 }
666
667 channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)
668 if cc.dopts.balancerBuilder != nil {
669 channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
670 return
671 }
672 if cc.balancerWrapper != nil {
673 cc.balancerWrapper.close()
674 }
675
676 builder := balancer.Get(name)
677 if builder == nil {
678 channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
679 channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
680 builder = newPickfirstBuilder()
681 } else {
682 channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)
683 }
684
685 cc.curBalancerName = builder.Name()
686 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
687}
688
689func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
690 cc.mu.Lock()
691 if cc.conns == nil {
692 cc.mu.Unlock()
693 return
694 }
695 // TODO(bar switching) send updates to all balancer wrappers when balancer
696 // gracefully switching is supported.
697 cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
698 cc.mu.Unlock()
699}
700
701// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
702//
703// Caller needs to make sure len(addrs) > 0.
704func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
705 ac := &addrConn{
706 state: connectivity.Idle,
707 cc: cc,
708 addrs: addrs,
709 scopts: opts,
710 dopts: cc.dopts,
711 czData: new(channelzData),
712 resetBackoff: make(chan struct{}),
713 }
714 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
715 // Track ac in cc. This needs to be done before any getTransport(...) is called.
716 cc.mu.Lock()
717 if cc.conns == nil {
718 cc.mu.Unlock()
719 return nil, ErrClientConnClosing
720 }
721 if channelz.IsOn() {
722 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
723 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
724 Desc: "Subchannel Created",
725 Severity: channelz.CtINFO,
726 Parent: &channelz.TraceEventDesc{
727 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
728 Severity: channelz.CtINFO,
729 },
730 })
731 }
732 cc.conns[ac] = struct{}{}
733 cc.mu.Unlock()
734 return ac, nil
735}
736
737// removeAddrConn removes the addrConn in the subConn from clientConn.
738// It also tears down the ac with the given error.
739func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
740 cc.mu.Lock()
741 if cc.conns == nil {
742 cc.mu.Unlock()
743 return
744 }
745 delete(cc.conns, ac)
746 cc.mu.Unlock()
747 ac.tearDown(err)
748}
749
750func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
751 return &channelz.ChannelInternalMetric{
752 State: cc.GetState(),
753 Target: cc.target,
754 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
755 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
756 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
757 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
758 }
759}
760
761// Target returns the target string of the ClientConn.
762// This is an EXPERIMENTAL API.
763func (cc *ClientConn) Target() string {
764 return cc.target
765}
766
767func (cc *ClientConn) incrCallsStarted() {
768 atomic.AddInt64(&cc.czData.callsStarted, 1)
769 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
770}
771
772func (cc *ClientConn) incrCallsSucceeded() {
773 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
774}
775
776func (cc *ClientConn) incrCallsFailed() {
777 atomic.AddInt64(&cc.czData.callsFailed, 1)
778}
779
780// connect starts creating a transport.
781// It does nothing if the ac is not IDLE.
782// TODO(bar) Move this to the addrConn section.
783func (ac *addrConn) connect() error {
784 ac.mu.Lock()
785 if ac.state == connectivity.Shutdown {
786 ac.mu.Unlock()
787 return errConnClosing
788 }
789 if ac.state != connectivity.Idle {
790 ac.mu.Unlock()
791 return nil
792 }
793 // Update connectivity state within the lock to prevent subsequent or
794 // concurrent calls from resetting the transport more than once.
795 ac.updateConnectivityState(connectivity.Connecting, nil)
796 ac.mu.Unlock()
797
798 // Start a goroutine connecting to the server asynchronously.
799 go ac.resetTransport()
800 return nil
801}
802
803// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
804//
805// If ac is Connecting, it returns false. The caller should tear down the ac and
806// create a new one. Note that the backoff will be reset when this happens.
807//
808// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
809// addresses will be picked up by retry in the next iteration after backoff.
810//
811// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
812//
813// If ac is Ready, it checks whether current connected address of ac is in the
814// new addrs list.
815// - If true, it updates ac.addrs and returns true. The ac will keep using
816// the existing connection.
817// - If false, it does nothing and returns false.
818func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
819 ac.mu.Lock()
820 defer ac.mu.Unlock()
821 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
822 if ac.state == connectivity.Shutdown ||
823 ac.state == connectivity.TransientFailure ||
824 ac.state == connectivity.Idle {
825 ac.addrs = addrs
826 return true
827 }
828
829 if ac.state == connectivity.Connecting {
830 return false
831 }
832
833 // ac.state is Ready, try to find the connected address.
834 var curAddrFound bool
835 for _, a := range addrs {
836 if reflect.DeepEqual(ac.curAddr, a) {
837 curAddrFound = true
838 break
839 }
840 }
841 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
842 if curAddrFound {
843 ac.addrs = addrs
844 }
845
846 return curAddrFound
847}
848
849// GetMethodConfig gets the method config of the input method.
850// If there's an exact match for input method (i.e. /service/method), we return
851// the corresponding MethodConfig.
852// If there isn't an exact match for the input method, we look for the default config
853// under the service (i.e /service/). If there is a default MethodConfig for
854// the service, we return it.
855// Otherwise, we return an empty MethodConfig.
856func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
857 // TODO: Avoid the locking here.
858 cc.mu.RLock()
859 defer cc.mu.RUnlock()
860 if cc.sc == nil {
861 return MethodConfig{}
862 }
863 m, ok := cc.sc.Methods[method]
864 if !ok {
865 i := strings.LastIndex(method, "/")
866 m = cc.sc.Methods[method[:i+1]]
867 }
868 return m
869}
870
871func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
872 cc.mu.RLock()
873 defer cc.mu.RUnlock()
874 if cc.sc == nil {
875 return nil
876 }
877 return cc.sc.healthCheckConfig
878}
879
880func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
881 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
882 Ctx: ctx,
883 FullMethodName: method,
884 })
885 if err != nil {
886 return nil, nil, toRPCErr(err)
887 }
888 return t, done, nil
889}
890
891func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
892 if sc == nil {
893 // should never reach here.
894 return
895 }
896 cc.sc = sc
897
898 if cc.sc.retryThrottling != nil {
899 newThrottler := &retryThrottler{
900 tokens: cc.sc.retryThrottling.MaxTokens,
901 max: cc.sc.retryThrottling.MaxTokens,
902 thresh: cc.sc.retryThrottling.MaxTokens / 2,
903 ratio: cc.sc.retryThrottling.TokenRatio,
904 }
905 cc.retryThrottler.Store(newThrottler)
906 } else {
907 cc.retryThrottler.Store((*retryThrottler)(nil))
908 }
909
910 if cc.dopts.balancerBuilder == nil {
911 // Only look at balancer types and switch balancer if balancer dial
912 // option is not set.
913 var newBalancerName string
914 if cc.sc != nil && cc.sc.lbConfig != nil {
915 newBalancerName = cc.sc.lbConfig.name
916 } else {
917 var isGRPCLB bool
918 for _, a := range addrs {
919 if a.Type == resolver.GRPCLB {
920 isGRPCLB = true
921 break
922 }
923 }
924 if isGRPCLB {
925 newBalancerName = grpclbName
926 } else if cc.sc != nil && cc.sc.LB != nil {
927 newBalancerName = *cc.sc.LB
928 } else {
929 newBalancerName = PickFirstBalancerName
930 }
931 }
932 cc.switchBalancer(newBalancerName)
933 } else if cc.balancerWrapper == nil {
934 // Balancer dial option was set, and this is the first time handling
935 // resolved addresses. Build a balancer with dopts.balancerBuilder.
936 cc.curBalancerName = cc.dopts.balancerBuilder.Name()
937 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
938 }
939}
940
941func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
942 cc.mu.RLock()
943 r := cc.resolverWrapper
944 cc.mu.RUnlock()
945 if r == nil {
946 return
947 }
948 go r.resolveNow(o)
949}
950
951// ResetConnectBackoff wakes up all subchannels in transient failure and causes
952// them to attempt another connection immediately. It also resets the backoff
953// times used for subsequent attempts regardless of the current state.
954//
955// In general, this function should not be used. Typical service or network
956// outages result in a reasonable client reconnection strategy by default.
957// However, if a previously unavailable network becomes available, this may be
958// used to trigger an immediate reconnect.
959//
960// This API is EXPERIMENTAL.
961func (cc *ClientConn) ResetConnectBackoff() {
962 cc.mu.Lock()
963 conns := cc.conns
964 cc.mu.Unlock()
965 for ac := range conns {
966 ac.resetConnectBackoff()
967 }
968}
969
970// Close tears down the ClientConn and all underlying connections.
971func (cc *ClientConn) Close() error {
972 defer cc.cancel()
973
974 cc.mu.Lock()
975 if cc.conns == nil {
976 cc.mu.Unlock()
977 return ErrClientConnClosing
978 }
979 conns := cc.conns
980 cc.conns = nil
981 cc.csMgr.updateState(connectivity.Shutdown)
982
983 rWrapper := cc.resolverWrapper
984 cc.resolverWrapper = nil
985 bWrapper := cc.balancerWrapper
986 cc.balancerWrapper = nil
987 cc.mu.Unlock()
988
989 cc.blockingpicker.close()
990
991 if rWrapper != nil {
992 rWrapper.close()
993 }
994 if bWrapper != nil {
995 bWrapper.close()
996 }
997
998 for ac := range conns {
999 ac.tearDown(ErrClientConnClosing)
1000 }
1001 if channelz.IsOn() {
1002 ted := &channelz.TraceEventDesc{
1003 Desc: "Channel Deleted",
1004 Severity: channelz.CtINFO,
1005 }
1006 if cc.dopts.channelzParentID != 0 {
1007 ted.Parent = &channelz.TraceEventDesc{
1008 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1009 Severity: channelz.CtINFO,
1010 }
1011 }
1012 channelz.AddTraceEvent(cc.channelzID, 0, ted)
1013 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1014 // the entity being deleted, and thus prevent it from being deleted right away.
1015 channelz.RemoveEntry(cc.channelzID)
1016 }
1017 return nil
1018}
1019
1020// addrConn is a network connection to a given address.
1021type addrConn struct {
1022 ctx context.Context
1023 cancel context.CancelFunc
1024
1025 cc *ClientConn
1026 dopts dialOptions
1027 acbw balancer.SubConn
1028 scopts balancer.NewSubConnOptions
1029
1030 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1031 // health checking may require server to report healthy to set ac to READY), and is reset
1032 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1033 // is received, transport is closed, ac has been torn down).
1034 transport transport.ClientTransport // The current transport.
1035
1036 mu sync.Mutex
1037 curAddr resolver.Address // The current address.
1038 addrs []resolver.Address // All addresses that the resolver resolved to.
1039
1040 // Use updateConnectivityState for updating addrConn's connectivity state.
1041 state connectivity.State
1042
1043 backoffIdx int // Needs to be stateful for resetConnectBackoff.
1044 resetBackoff chan struct{}
1045
1046 channelzID int64 // channelz unique identification number.
1047 czData *channelzData
1048}
1049
1050// Note: this requires a lock on ac.mu.
1051func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1052 if ac.state == s {
1053 return
1054 }
1055 ac.state = s
1056 channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)
1057 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1058}
1059
1060// adjustParams updates parameters used to create transports upon
1061// receiving a GoAway.
1062func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1063 switch r {
1064 case transport.GoAwayTooManyPings:
1065 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1066 ac.cc.mu.Lock()
1067 if v > ac.cc.mkp.Time {
1068 ac.cc.mkp.Time = v
1069 }
1070 ac.cc.mu.Unlock()
1071 }
1072}
1073
1074func (ac *addrConn) resetTransport() {
1075 for i := 0; ; i++ {
1076 if i > 0 {
1077 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1078 }
1079
1080 ac.mu.Lock()
1081 if ac.state == connectivity.Shutdown {
1082 ac.mu.Unlock()
1083 return
1084 }
1085
1086 addrs := ac.addrs
1087 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1088 // This will be the duration that dial gets to finish.
1089 dialDuration := minConnectTimeout
1090 if ac.dopts.minConnectTimeout != nil {
1091 dialDuration = ac.dopts.minConnectTimeout()
1092 }
1093
1094 if dialDuration < backoffFor {
1095 // Give dial more time as we keep failing to connect.
1096 dialDuration = backoffFor
1097 }
1098 // We can potentially spend all the time trying the first address, and
1099 // if the server accepts the connection and then hangs, the following
1100 // addresses will never be tried.
1101 //
1102 // The spec doesn't mention what should be done for multiple addresses.
1103 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1104 connectDeadline := time.Now().Add(dialDuration)
1105
1106 ac.updateConnectivityState(connectivity.Connecting, nil)
1107 ac.transport = nil
1108 ac.mu.Unlock()
1109
1110 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1111 if err != nil {
1112 // After exhausting all addresses, the addrConn enters
1113 // TRANSIENT_FAILURE.
1114 ac.mu.Lock()
1115 if ac.state == connectivity.Shutdown {
1116 ac.mu.Unlock()
1117 return
1118 }
1119 ac.updateConnectivityState(connectivity.TransientFailure, err)
1120
1121 // Backoff.
1122 b := ac.resetBackoff
1123 ac.mu.Unlock()
1124
1125 timer := time.NewTimer(backoffFor)
1126 select {
1127 case <-timer.C:
1128 ac.mu.Lock()
1129 ac.backoffIdx++
1130 ac.mu.Unlock()
1131 case <-b:
1132 timer.Stop()
1133 case <-ac.ctx.Done():
1134 timer.Stop()
1135 return
1136 }
1137 continue
1138 }
1139
1140 ac.mu.Lock()
1141 if ac.state == connectivity.Shutdown {
1142 ac.mu.Unlock()
1143 newTr.Close()
1144 return
1145 }
1146 ac.curAddr = addr
1147 ac.transport = newTr
1148 ac.backoffIdx = 0
1149
1150 hctx, hcancel := context.WithCancel(ac.ctx)
1151 ac.startHealthCheck(hctx)
1152 ac.mu.Unlock()
1153
1154 // Block until the created transport is down. And when this happens,
1155 // we restart from the top of the addr list.
1156 <-reconnect.Done()
1157 hcancel()
1158 // restart connecting - the top of the loop will set state to
1159 // CONNECTING. This is against the current connectivity semantics doc,
1160 // however it allows for graceful behavior for RPCs not yet dispatched
1161 // - unfortunate timing would otherwise lead to the RPC failing even
1162 // though the TRANSIENT_FAILURE state (called for by the doc) would be
1163 // instantaneous.
1164 //
1165 // Ideally we should transition to Idle here and block until there is
1166 // RPC activity that leads to the balancer requesting a reconnect of
1167 // the associated SubConn.
1168 }
1169}
1170
1171// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1172// first successful one. It returns the transport, the address and a Event in
1173// the successful case. The Event fires when the returned transport disconnects.
1174func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1175 var firstConnErr error
1176 for _, addr := range addrs {
1177 ac.mu.Lock()
1178 if ac.state == connectivity.Shutdown {
1179 ac.mu.Unlock()
1180 return nil, resolver.Address{}, nil, errConnClosing
1181 }
1182
1183 ac.cc.mu.RLock()
1184 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1185 ac.cc.mu.RUnlock()
1186
1187 copts := ac.dopts.copts
1188 if ac.scopts.CredsBundle != nil {
1189 copts.CredsBundle = ac.scopts.CredsBundle
1190 }
1191 ac.mu.Unlock()
1192
1193 channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1194
1195 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1196 if err == nil {
1197 return newTr, addr, reconnect, nil
1198 }
1199 if firstConnErr == nil {
1200 firstConnErr = err
1201 }
1202 ac.cc.blockingpicker.updateConnectionError(err)
1203 }
1204
1205 // Couldn't connect to any address.
1206 return nil, resolver.Address{}, nil, firstConnErr
1207}
1208
1209// createTransport creates a connection to addr. It returns the transport and a
1210// Event in the successful case. The Event fires when the returned transport
1211// disconnects.
1212func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1213 prefaceReceived := make(chan struct{})
1214 onCloseCalled := make(chan struct{})
1215 reconnect := grpcsync.NewEvent()
1216
1217 authority := ac.cc.authority
1218 // addr.ServerName takes precedent over ClientConn authority, if present.
1219 if addr.ServerName != "" {
1220 authority = addr.ServerName
1221 }
1222
1223 target := transport.TargetInfo{
1224 Addr: addr.Addr,
1225 Metadata: addr.Metadata,
1226 Authority: authority,
1227 }
1228
1229 once := sync.Once{}
1230 onGoAway := func(r transport.GoAwayReason) {
1231 ac.mu.Lock()
1232 ac.adjustParams(r)
1233 once.Do(func() {
1234 if ac.state == connectivity.Ready {
1235 // Prevent this SubConn from being used for new RPCs by setting its
1236 // state to Connecting.
1237 //
1238 // TODO: this should be Idle when grpc-go properly supports it.
1239 ac.updateConnectivityState(connectivity.Connecting, nil)
1240 }
1241 })
1242 ac.mu.Unlock()
1243 reconnect.Fire()
1244 }
1245
1246 onClose := func() {
1247 ac.mu.Lock()
1248 once.Do(func() {
1249 if ac.state == connectivity.Ready {
1250 // Prevent this SubConn from being used for new RPCs by setting its
1251 // state to Connecting.
1252 //
1253 // TODO: this should be Idle when grpc-go properly supports it.
1254 ac.updateConnectivityState(connectivity.Connecting, nil)
1255 }
1256 })
1257 ac.mu.Unlock()
1258 close(onCloseCalled)
1259 reconnect.Fire()
1260 }
1261
1262 onPrefaceReceipt := func() {
1263 close(prefaceReceived)
1264 }
1265
1266 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1267 defer cancel()
1268 if channelz.IsOn() {
1269 copts.ChannelzParentID = ac.channelzID
1270 }
1271
1272 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1273 if err != nil {
1274 // newTr is either nil, or closed.
1275 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
1276 return nil, nil, err
1277 }
1278
1279 select {
1280 case <-time.After(time.Until(connectDeadline)):
1281 // We didn't get the preface in time.
1282 newTr.Close()
1283 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1284 return nil, nil, errors.New("timed out waiting for server handshake")
1285 case <-prefaceReceived:
1286 // We got the preface - huzzah! things are good.
1287 case <-onCloseCalled:
1288 // The transport has already closed - noop.
1289 return nil, nil, errors.New("connection closed")
1290 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1291 }
1292 return newTr, reconnect, nil
1293}
1294
1295// startHealthCheck starts the health checking stream (RPC) to watch the health
1296// stats of this connection if health checking is requested and configured.
1297//
1298// LB channel health checking is enabled when all requirements below are met:
1299// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1300// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1301// 3. a service config with non-empty healthCheckConfig field is provided
1302// 4. the load balancer requests it
1303//
1304// It sets addrConn to READY if the health checking stream is not started.
1305//
1306// Caller must hold ac.mu.
1307func (ac *addrConn) startHealthCheck(ctx context.Context) {
1308 var healthcheckManagingState bool
1309 defer func() {
1310 if !healthcheckManagingState {
1311 ac.updateConnectivityState(connectivity.Ready, nil)
1312 }
1313 }()
1314
1315 if ac.cc.dopts.disableHealthCheck {
1316 return
1317 }
1318 healthCheckConfig := ac.cc.healthCheckConfig()
1319 if healthCheckConfig == nil {
1320 return
1321 }
1322 if !ac.scopts.HealthCheckEnabled {
1323 return
1324 }
1325 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1326 if healthCheckFunc == nil {
1327 // The health package is not imported to set health check function.
1328 //
1329 // TODO: add a link to the health check doc in the error message.
1330 channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")
1331 return
1332 }
1333
1334 healthcheckManagingState = true
1335
1336 // Set up the health check helper functions.
1337 currentTr := ac.transport
1338 newStream := func(method string) (interface{}, error) {
1339 ac.mu.Lock()
1340 if ac.transport != currentTr {
1341 ac.mu.Unlock()
1342 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1343 }
1344 ac.mu.Unlock()
1345 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1346 }
1347 setConnectivityState := func(s connectivity.State, lastErr error) {
1348 ac.mu.Lock()
1349 defer ac.mu.Unlock()
1350 if ac.transport != currentTr {
1351 return
1352 }
1353 ac.updateConnectivityState(s, lastErr)
1354 }
1355 // Start the health checking stream.
1356 go func() {
1357 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1358 if err != nil {
1359 if status.Code(err) == codes.Unimplemented {
1360 channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1361 } else {
1362 channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1363 }
1364 }
1365 }()
1366}
1367
1368func (ac *addrConn) resetConnectBackoff() {
1369 ac.mu.Lock()
1370 close(ac.resetBackoff)
1371 ac.backoffIdx = 0
1372 ac.resetBackoff = make(chan struct{})
1373 ac.mu.Unlock()
1374}
1375
1376// getReadyTransport returns the transport if ac's state is READY.
1377// Otherwise it returns nil, false.
1378// If ac's state is IDLE, it will trigger ac to connect.
1379func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1380 ac.mu.Lock()
1381 if ac.state == connectivity.Ready && ac.transport != nil {
1382 t := ac.transport
1383 ac.mu.Unlock()
1384 return t, true
1385 }
1386 var idle bool
1387 if ac.state == connectivity.Idle {
1388 idle = true
1389 }
1390 ac.mu.Unlock()
1391 // Trigger idle ac to connect.
1392 if idle {
1393 ac.connect()
1394 }
1395 return nil, false
1396}
1397
1398// tearDown starts to tear down the addrConn.
1399// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1400// some edge cases (e.g., the caller opens and closes many addrConn's in a
1401// tight loop.
1402// tearDown doesn't remove ac from ac.cc.conns.
1403func (ac *addrConn) tearDown(err error) {
1404 ac.mu.Lock()
1405 if ac.state == connectivity.Shutdown {
1406 ac.mu.Unlock()
1407 return
1408 }
1409 curTr := ac.transport
1410 ac.transport = nil
1411 // We have to set the state to Shutdown before anything else to prevent races
1412 // between setting the state and logic that waits on context cancellation / etc.
1413 ac.updateConnectivityState(connectivity.Shutdown, nil)
1414 ac.cancel()
1415 ac.curAddr = resolver.Address{}
1416 if err == errConnDrain && curTr != nil {
1417 // GracefulClose(...) may be executed multiple times when
1418 // i) receiving multiple GoAway frames from the server; or
1419 // ii) there are concurrent name resolver/Balancer triggered
1420 // address removal and GoAway.
1421 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1422 ac.mu.Unlock()
1423 curTr.GracefulClose()
1424 ac.mu.Lock()
1425 }
1426 if channelz.IsOn() {
1427 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
1428 Desc: "Subchannel Deleted",
1429 Severity: channelz.CtINFO,
1430 Parent: &channelz.TraceEventDesc{
1431 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1432 Severity: channelz.CtINFO,
1433 },
1434 })
1435 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1436 // the entity being deleted, and thus prevent it from being deleted right away.
1437 channelz.RemoveEntry(ac.channelzID)
1438 }
1439 ac.mu.Unlock()
1440}
1441
1442func (ac *addrConn) getState() connectivity.State {
1443 ac.mu.Lock()
1444 defer ac.mu.Unlock()
1445 return ac.state
1446}
1447
1448func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1449 ac.mu.Lock()
1450 addr := ac.curAddr.Addr
1451 ac.mu.Unlock()
1452 return &channelz.ChannelInternalMetric{
1453 State: ac.getState(),
1454 Target: addr,
1455 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1456 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1457 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1458 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1459 }
1460}
1461
1462func (ac *addrConn) incrCallsStarted() {
1463 atomic.AddInt64(&ac.czData.callsStarted, 1)
1464 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1465}
1466
1467func (ac *addrConn) incrCallsSucceeded() {
1468 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1469}
1470
1471func (ac *addrConn) incrCallsFailed() {
1472 atomic.AddInt64(&ac.czData.callsFailed, 1)
1473}
1474
1475type retryThrottler struct {
1476 max float64
1477 thresh float64
1478 ratio float64
1479
1480 mu sync.Mutex
1481 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1482}
1483
1484// throttle subtracts a retry token from the pool and returns whether a retry
1485// should be throttled (disallowed) based upon the retry throttling policy in
1486// the service config.
1487func (rt *retryThrottler) throttle() bool {
1488 if rt == nil {
1489 return false
1490 }
1491 rt.mu.Lock()
1492 defer rt.mu.Unlock()
1493 rt.tokens--
1494 if rt.tokens < 0 {
1495 rt.tokens = 0
1496 }
1497 return rt.tokens <= rt.thresh
1498}
1499
1500func (rt *retryThrottler) successfulRPC() {
1501 if rt == nil {
1502 return
1503 }
1504 rt.mu.Lock()
1505 defer rt.mu.Unlock()
1506 rt.tokens += rt.ratio
1507 if rt.tokens > rt.max {
1508 rt.tokens = rt.max
1509 }
1510}
1511
1512type channelzChannel struct {
1513 cc *ClientConn
1514}
1515
1516func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1517 return c.cc.channelzMetric()
1518}
1519
1520// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1521// underlying connections within the specified timeout.
1522//
1523// Deprecated: This error is never returned by grpc and should not be
1524// referenced by users.
1525var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1526
1527func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1528 for _, rb := range cc.dopts.resolvers {
1529 if scheme == rb.Scheme() {
1530 return rb
1531 }
1532 }
1533 return resolver.Get(scheme)
1534}