| /* |
| * |
| * Copyright 2018 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package grpc |
| |
| import ( |
| "context" |
| "fmt" |
| "net" |
| "time" |
| |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/credentials" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/backoff" |
| "google.golang.org/grpc/internal/envconfig" |
| "google.golang.org/grpc/internal/transport" |
| "google.golang.org/grpc/keepalive" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/stats" |
| ) |
| |
| // dialOptions configure a Dial call. dialOptions are set by the DialOption |
| // values passed to Dial. |
| type dialOptions struct { |
| unaryInt UnaryClientInterceptor |
| streamInt StreamClientInterceptor |
| cp Compressor |
| dc Decompressor |
| bs backoff.Strategy |
| block bool |
| insecure bool |
| timeout time.Duration |
| scChan <-chan ServiceConfig |
| authority string |
| copts transport.ConnectOptions |
| callOptions []CallOption |
| // This is used by v1 balancer dial option WithBalancer to support v1 |
| // balancer, and also by WithBalancerName dial option. |
| balancerBuilder balancer.Builder |
| // This is to support grpclb. |
| resolverBuilder resolver.Builder |
| reqHandshake envconfig.RequireHandshakeSetting |
| channelzParentID int64 |
| disableServiceConfig bool |
| disableRetry bool |
| disableHealthCheck bool |
| healthCheckFunc internal.HealthChecker |
| } |
| |
| // DialOption configures how we set up the connection. |
| type DialOption interface { |
| apply(*dialOptions) |
| } |
| |
| // EmptyDialOption does not alter the dial configuration. It can be embedded in |
| // another structure to build custom dial options. |
| // |
| // This API is EXPERIMENTAL. |
| type EmptyDialOption struct{} |
| |
| func (EmptyDialOption) apply(*dialOptions) {} |
| |
| // funcDialOption wraps a function that modifies dialOptions into an |
| // implementation of the DialOption interface. |
| type funcDialOption struct { |
| f func(*dialOptions) |
| } |
| |
| func (fdo *funcDialOption) apply(do *dialOptions) { |
| fdo.f(do) |
| } |
| |
| func newFuncDialOption(f func(*dialOptions)) *funcDialOption { |
| return &funcDialOption{ |
| f: f, |
| } |
| } |
| |
| // WithWaitForHandshake blocks until the initial settings frame is received from |
| // the server before assigning RPCs to the connection. |
| // |
| // Deprecated: this is the default behavior, and this option will be removed |
| // after the 1.18 release. |
| func WithWaitForHandshake() DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.reqHandshake = envconfig.RequireHandshakeOn |
| }) |
| } |
| |
| // WithWriteBufferSize determines how much data can be batched before doing a |
| // write on the wire. The corresponding memory allocation for this buffer will |
| // be twice the size to keep syscalls low. The default value for this buffer is |
| // 32KB. |
| // |
| // Zero will disable the write buffer such that each write will be on underlying |
| // connection. Note: A Send call may not directly translate to a write. |
| func WithWriteBufferSize(s int) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.WriteBufferSize = s |
| }) |
| } |
| |
| // WithReadBufferSize lets you set the size of read buffer, this determines how |
| // much data can be read at most for each read syscall. |
| // |
| // The default value for this buffer is 32KB. Zero will disable read buffer for |
| // a connection so data framer can access the underlying conn directly. |
| func WithReadBufferSize(s int) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.ReadBufferSize = s |
| }) |
| } |
| |
| // WithInitialWindowSize returns a DialOption which sets the value for initial |
| // window size on a stream. The lower bound for window size is 64K and any value |
| // smaller than that will be ignored. |
| func WithInitialWindowSize(s int32) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.InitialWindowSize = s |
| }) |
| } |
| |
| // WithInitialConnWindowSize returns a DialOption which sets the value for |
| // initial window size on a connection. The lower bound for window size is 64K |
| // and any value smaller than that will be ignored. |
| func WithInitialConnWindowSize(s int32) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.InitialConnWindowSize = s |
| }) |
| } |
| |
| // WithMaxMsgSize returns a DialOption which sets the maximum message size the |
| // client can receive. |
| // |
| // Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. |
| func WithMaxMsgSize(s int) DialOption { |
| return WithDefaultCallOptions(MaxCallRecvMsgSize(s)) |
| } |
| |
| // WithDefaultCallOptions returns a DialOption which sets the default |
| // CallOptions for calls over the connection. |
| func WithDefaultCallOptions(cos ...CallOption) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.callOptions = append(o.callOptions, cos...) |
| }) |
| } |
| |
| // WithCodec returns a DialOption which sets a codec for message marshaling and |
| // unmarshaling. |
| // |
| // Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead. |
| func WithCodec(c Codec) DialOption { |
| return WithDefaultCallOptions(CallCustomCodec(c)) |
| } |
| |
| // WithCompressor returns a DialOption which sets a Compressor to use for |
| // message compression. It has lower priority than the compressor set by the |
| // UseCompressor CallOption. |
| // |
| // Deprecated: use UseCompressor instead. |
| func WithCompressor(cp Compressor) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.cp = cp |
| }) |
| } |
| |
| // WithDecompressor returns a DialOption which sets a Decompressor to use for |
| // incoming message decompression. If incoming response messages are encoded |
| // using the decompressor's Type(), it will be used. Otherwise, the message |
| // encoding will be used to look up the compressor registered via |
| // encoding.RegisterCompressor, which will then be used to decompress the |
| // message. If no compressor is registered for the encoding, an Unimplemented |
| // status error will be returned. |
| // |
| // Deprecated: use encoding.RegisterCompressor instead. |
| func WithDecompressor(dc Decompressor) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.dc = dc |
| }) |
| } |
| |
| // WithBalancer returns a DialOption which sets a load balancer with the v1 API. |
| // Name resolver will be ignored if this DialOption is specified. |
| // |
| // Deprecated: use the new balancer APIs in balancer package and |
| // WithBalancerName. |
| func WithBalancer(b Balancer) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.balancerBuilder = &balancerWrapperBuilder{ |
| b: b, |
| } |
| }) |
| } |
| |
| // WithBalancerName sets the balancer that the ClientConn will be initialized |
| // with. Balancer registered with balancerName will be used. This function |
| // panics if no balancer was registered by balancerName. |
| // |
| // The balancer cannot be overridden by balancer option specified by service |
| // config. |
| // |
| // This is an EXPERIMENTAL API. |
| func WithBalancerName(balancerName string) DialOption { |
| builder := balancer.Get(balancerName) |
| if builder == nil { |
| panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName)) |
| } |
| return newFuncDialOption(func(o *dialOptions) { |
| o.balancerBuilder = builder |
| }) |
| } |
| |
| // withResolverBuilder is only for grpclb. |
| func withResolverBuilder(b resolver.Builder) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.resolverBuilder = b |
| }) |
| } |
| |
| // WithServiceConfig returns a DialOption which has a channel to read the |
| // service configuration. |
| // |
| // Deprecated: service config should be received through name resolver, as |
| // specified here. |
| // https://github.com/grpc/grpc/blob/master/doc/service_config.md |
| func WithServiceConfig(c <-chan ServiceConfig) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.scChan = c |
| }) |
| } |
| |
| // WithBackoffMaxDelay configures the dialer to use the provided maximum delay |
| // when backing off after failed connection attempts. |
| func WithBackoffMaxDelay(md time.Duration) DialOption { |
| return WithBackoffConfig(BackoffConfig{MaxDelay: md}) |
| } |
| |
| // WithBackoffConfig configures the dialer to use the provided backoff |
| // parameters after connection failures. |
| // |
| // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up |
| // for use. |
| func WithBackoffConfig(b BackoffConfig) DialOption { |
| return withBackoff(backoff.Exponential{ |
| MaxDelay: b.MaxDelay, |
| }) |
| } |
| |
| // withBackoff sets the backoff strategy used for connectRetryNum after a failed |
| // connection attempt. |
| // |
| // This can be exported if arbitrary backoff strategies are allowed by gRPC. |
| func withBackoff(bs backoff.Strategy) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.bs = bs |
| }) |
| } |
| |
| // WithBlock returns a DialOption which makes caller of Dial blocks until the |
| // underlying connection is up. Without this, Dial returns immediately and |
| // connecting the server happens in background. |
| func WithBlock() DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.block = true |
| }) |
| } |
| |
| // WithInsecure returns a DialOption which disables transport security for this |
| // ClientConn. Note that transport security is required unless WithInsecure is |
| // set. |
| func WithInsecure() DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.insecure = true |
| }) |
| } |
| |
| // WithTransportCredentials returns a DialOption which configures a connection |
| // level security credentials (e.g., TLS/SSL). This should not be used together |
| // with WithCredentialsBundle. |
| func WithTransportCredentials(creds credentials.TransportCredentials) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.TransportCredentials = creds |
| }) |
| } |
| |
| // WithPerRPCCredentials returns a DialOption which sets credentials and places |
| // auth state on each outbound RPC. |
| func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) |
| }) |
| } |
| |
| // WithCredentialsBundle returns a DialOption to set a credentials bundle for |
| // the ClientConn.WithCreds. This should not be used together with |
| // WithTransportCredentials. |
| // |
| // This API is experimental. |
| func WithCredentialsBundle(b credentials.Bundle) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.CredsBundle = b |
| }) |
| } |
| |
| // WithTimeout returns a DialOption that configures a timeout for dialing a |
| // ClientConn initially. This is valid if and only if WithBlock() is present. |
| // |
| // Deprecated: use DialContext and context.WithTimeout instead. |
| func WithTimeout(d time.Duration) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.timeout = d |
| }) |
| } |
| |
| func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.Dialer = f |
| }) |
| } |
| |
| func init() { |
| internal.WithContextDialer = withContextDialer |
| internal.WithResolverBuilder = withResolverBuilder |
| internal.WithHealthCheckFunc = withHealthCheckFunc |
| } |
| |
| // WithDialer returns a DialOption that specifies a function to use for dialing |
| // network addresses. If FailOnNonTempDialError() is set to true, and an error |
| // is returned by f, gRPC checks the error's Temporary() method to decide if it |
| // should try to reconnect to the network address. |
| func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { |
| return withContextDialer( |
| func(ctx context.Context, addr string) (net.Conn, error) { |
| if deadline, ok := ctx.Deadline(); ok { |
| return f(addr, deadline.Sub(time.Now())) |
| } |
| return f(addr, 0) |
| }) |
| } |
| |
| // WithStatsHandler returns a DialOption that specifies the stats handler for |
| // all the RPCs and underlying network connections in this ClientConn. |
| func WithStatsHandler(h stats.Handler) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.StatsHandler = h |
| }) |
| } |
| |
| // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on |
| // non-temporary dial errors. If f is true, and dialer returns a non-temporary |
| // error, gRPC will fail the connection to the network address and won't try to |
| // reconnect. The default value of FailOnNonTempDialError is false. |
| // |
| // FailOnNonTempDialError only affects the initial dial, and does not do |
| // anything useful unless you are also using WithBlock(). |
| // |
| // This is an EXPERIMENTAL API. |
| func FailOnNonTempDialError(f bool) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.FailOnNonTempDialError = f |
| }) |
| } |
| |
| // WithUserAgent returns a DialOption that specifies a user agent string for all |
| // the RPCs. |
| func WithUserAgent(s string) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.UserAgent = s |
| }) |
| } |
| |
| // WithKeepaliveParams returns a DialOption that specifies keepalive parameters |
| // for the client transport. |
| func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.KeepaliveParams = kp |
| }) |
| } |
| |
| // WithUnaryInterceptor returns a DialOption that specifies the interceptor for |
| // unary RPCs. |
| func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.unaryInt = f |
| }) |
| } |
| |
| // WithStreamInterceptor returns a DialOption that specifies the interceptor for |
| // streaming RPCs. |
| func WithStreamInterceptor(f StreamClientInterceptor) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.streamInt = f |
| }) |
| } |
| |
| // WithAuthority returns a DialOption that specifies the value to be used as the |
| // :authority pseudo-header. This value only works with WithInsecure and has no |
| // effect if TransportCredentials are present. |
| func WithAuthority(a string) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.authority = a |
| }) |
| } |
| |
| // WithChannelzParentID returns a DialOption that specifies the channelz ID of |
| // current ClientConn's parent. This function is used in nested channel creation |
| // (e.g. grpclb dial). |
| func WithChannelzParentID(id int64) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.channelzParentID = id |
| }) |
| } |
| |
| // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any |
| // service config provided by the resolver and provides a hint to the resolver |
| // to not fetch service configs. |
| func WithDisableServiceConfig() DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.disableServiceConfig = true |
| }) |
| } |
| |
| // WithDisableRetry returns a DialOption that disables retries, even if the |
| // service config enables them. This does not impact transparent retries, which |
| // will happen automatically if no data is written to the wire or if the RPC is |
| // unprocessed by the remote server. |
| // |
| // Retry support is currently disabled by default, but will be enabled by |
| // default in the future. Until then, it may be enabled by setting the |
| // environment variable "GRPC_GO_RETRY" to "on". |
| // |
| // This API is EXPERIMENTAL. |
| func WithDisableRetry() DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.disableRetry = true |
| }) |
| } |
| |
| // WithMaxHeaderListSize returns a DialOption that specifies the maximum |
| // (uncompressed) size of header list that the client is prepared to accept. |
| func WithMaxHeaderListSize(s uint32) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.copts.MaxHeaderListSize = &s |
| }) |
| } |
| |
| // WithDisableHealthCheck disables the LB channel health checking for all SubConns of this ClientConn. |
| // |
| // This API is EXPERIMENTAL. |
| func WithDisableHealthCheck() DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.disableHealthCheck = true |
| }) |
| } |
| |
| // withHealthCheckFunc replaces the default health check function with the provided one. It makes |
| // tests easier to change the health check function. |
| // |
| // For testing purpose only. |
| func withHealthCheckFunc(f internal.HealthChecker) DialOption { |
| return newFuncDialOption(func(o *dialOptions) { |
| o.healthCheckFunc = f |
| }) |
| } |
| |
| func defaultDialOptions() dialOptions { |
| return dialOptions{ |
| disableRetry: !envconfig.Retry, |
| reqHandshake: envconfig.RequireHandshake, |
| healthCheckFunc: internal.HealthCheckFunc, |
| copts: transport.ConnectOptions{ |
| WriteBufferSize: defaultWriteBufSize, |
| ReadBufferSize: defaultReadBufSize, |
| }, |
| } |
| } |