blob: c49e4ba1237aa7bc9833f986144909da593a28cd [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "net"
22 "os"
23 "strconv"
24 "strings"
25 "sync"
26 "time"
27
28 "github.com/coreos/etcd/clientv3/balancer"
29 "github.com/coreos/etcd/clientv3/balancer/picker"
30 "github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
31 "github.com/coreos/etcd/clientv3/credentials"
32 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
33 "github.com/coreos/etcd/pkg/logutil"
34 "github.com/coreos/pkg/capnslog"
35 "github.com/google/uuid"
36 "go.uber.org/zap"
37 "google.golang.org/grpc"
38 "google.golang.org/grpc/codes"
39 grpccredentials "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/keepalive"
41 "google.golang.org/grpc/status"
42)
43
44var (
45 ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
46 ErrOldCluster = errors.New("etcdclient: old cluster version")
47
48 roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
49)
50
51var (
52 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "clientv3")
53)
54
55func init() {
56 lg := zap.NewNop()
57 if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
58 lcfg := logutil.DefaultZapLoggerConfig
59 lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
60
61 var err error
62 lg, err = lcfg.Build() // info level logging
63 if err != nil {
64 panic(err)
65 }
66 }
67
68 // TODO: support custom balancer
69 balancer.RegisterBuilder(balancer.Config{
70 Policy: picker.RoundrobinBalanced,
71 Name: roundRobinBalancerName,
72 Logger: lg,
73 })
74}
75
76// Client provides and manages an etcd v3 client session.
77type Client struct {
78 Cluster
79 KV
80 Lease
81 Watcher
82 Auth
83 Maintenance
84
85 conn *grpc.ClientConn
86
87 cfg Config
88 creds grpccredentials.TransportCredentials
89 resolverGroup *endpoint.ResolverGroup
90 mu *sync.RWMutex
91
92 ctx context.Context
93 cancel context.CancelFunc
94
95 // Username is a user name for authentication.
96 Username string
97 // Password is a password for authentication.
98 Password string
99 authTokenBundle credentials.Bundle
100
101 callOpts []grpc.CallOption
102
103 lg *zap.Logger
104}
105
106// New creates a new etcdv3 client from a given configuration.
107func New(cfg Config) (*Client, error) {
108 if len(cfg.Endpoints) == 0 {
109 return nil, ErrNoAvailableEndpoints
110 }
111
112 return newClient(&cfg)
113}
114
115// NewCtxClient creates a client with a context but no underlying grpc
116// connection. This is useful for embedded cases that override the
117// service interface implementations and do not need connection management.
118func NewCtxClient(ctx context.Context) *Client {
119 cctx, cancel := context.WithCancel(ctx)
120 return &Client{ctx: cctx, cancel: cancel}
121}
122
123// NewFromURL creates a new etcdv3 client from a URL.
124func NewFromURL(url string) (*Client, error) {
125 return New(Config{Endpoints: []string{url}})
126}
127
128// NewFromURLs creates a new etcdv3 client from URLs.
129func NewFromURLs(urls []string) (*Client, error) {
130 return New(Config{Endpoints: urls})
131}
132
133// Close shuts down the client's etcd connections.
134func (c *Client) Close() error {
135 c.cancel()
136 c.Watcher.Close()
137 c.Lease.Close()
138 if c.resolverGroup != nil {
139 c.resolverGroup.Close()
140 }
141 if c.conn != nil {
142 return toErr(c.ctx, c.conn.Close())
143 }
144 return c.ctx.Err()
145}
146
147// Ctx is a context for "out of band" messages (e.g., for sending
148// "clean up" message when another context is canceled). It is
149// canceled on client Close().
150func (c *Client) Ctx() context.Context { return c.ctx }
151
152// Endpoints lists the registered endpoints for the client.
153func (c *Client) Endpoints() []string {
154 // copy the slice; protect original endpoints from being changed
155 c.mu.RLock()
156 defer c.mu.RUnlock()
157 eps := make([]string, len(c.cfg.Endpoints))
158 copy(eps, c.cfg.Endpoints)
159 return eps
160}
161
162// SetEndpoints updates client's endpoints.
163func (c *Client) SetEndpoints(eps ...string) {
164 c.mu.Lock()
165 defer c.mu.Unlock()
166 c.cfg.Endpoints = eps
167 c.resolverGroup.SetEndpoints(eps)
168}
169
170// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
171func (c *Client) Sync(ctx context.Context) error {
172 mresp, err := c.MemberList(ctx)
173 if err != nil {
174 return err
175 }
176 var eps []string
177 for _, m := range mresp.Members {
178 eps = append(eps, m.ClientURLs...)
179 }
180 c.SetEndpoints(eps...)
181 return nil
182}
183
184func (c *Client) autoSync() {
185 if c.cfg.AutoSyncInterval == time.Duration(0) {
186 return
187 }
188
189 for {
190 select {
191 case <-c.ctx.Done():
192 return
193 case <-time.After(c.cfg.AutoSyncInterval):
194 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
195 err := c.Sync(ctx)
196 cancel()
197 if err != nil && err != c.ctx.Err() {
198 lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
199 }
200 }
201 }
202}
203
204func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
205 creds = c.creds
206 switch scheme {
207 case "unix":
208 case "http":
209 creds = nil
210 case "https", "unixs":
211 if creds != nil {
212 break
213 }
214 creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
215 default:
216 creds = nil
217 }
218 return creds
219}
220
221// dialSetupOpts gives the dial opts prior to any authentication.
222func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
223 if c.cfg.DialKeepAliveTime > 0 {
224 params := keepalive.ClientParameters{
225 Time: c.cfg.DialKeepAliveTime,
226 Timeout: c.cfg.DialKeepAliveTimeout,
227 PermitWithoutStream: c.cfg.PermitWithoutStream,
228 }
229 opts = append(opts, grpc.WithKeepaliveParams(params))
230 }
231 opts = append(opts, dopts...)
232
233 dialer := endpoint.Dialer
234 if creds != nil {
235 opts = append(opts, grpc.WithTransportCredentials(creds))
236 // gRPC load balancer workaround. See credentials.transportCredential for details.
237 if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
238 dialer = credsDialer.Dialer
239 }
240 } else {
241 opts = append(opts, grpc.WithInsecure())
242 }
243 opts = append(opts, grpc.WithContextDialer(dialer))
244
245 // Interceptor retry and backoff.
246 // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
247 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
248 // once it is available.
249 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
250 opts = append(opts,
251 // Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
252 // Streams that are safe to retry are enabled individually.
253 grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
254 grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
255 )
256
257 return opts, nil
258}
259
260// Dial connects to a single endpoint using the client's config.
261func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
262 creds, err := c.directDialCreds(ep)
263 if err != nil {
264 return nil, err
265 }
266 // Use the grpc passthrough resolver to directly dial a single endpoint.
267 // This resolver passes through the 'unix' and 'unixs' endpoints schemes used
268 // by etcd without modification, allowing us to directly dial endpoints and
269 // using the same dial functions that we use for load balancer dialing.
270 return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
271}
272
273func (c *Client) getToken(ctx context.Context) error {
274 var err error // return last error in a case of fail
275 var auth *authenticator
276
277 eps := c.Endpoints()
278 for _, ep := range eps {
279 // use dial options without dopts to avoid reusing the client balancer
280 var dOpts []grpc.DialOption
281 _, host, _ := endpoint.ParseEndpoint(ep)
282 target := c.resolverGroup.Target(host)
283 creds := c.dialWithBalancerCreds(ep)
284 dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...)
285 if err != nil {
286 err = fmt.Errorf("failed to configure auth dialer: %v", err)
287 continue
288 }
289 dOpts = append(dOpts, grpc.WithBalancerName(roundRobinBalancerName))
290 auth, err = newAuthenticator(ctx, target, dOpts, c)
291 if err != nil {
292 continue
293 }
294 defer auth.close()
295
296 var resp *AuthenticateResponse
297 resp, err = auth.authenticate(ctx, c.Username, c.Password)
298 if err != nil {
299 // return err without retrying other endpoints
300 if err == rpctypes.ErrAuthNotEnabled {
301 return err
302 }
303 continue
304 }
305
306 c.authTokenBundle.UpdateAuthToken(resp.Token)
307 return nil
308 }
309
310 return err
311}
312
313// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
314// of the provided endpoint determines the scheme used for all endpoints of the client connection.
315func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
316 _, host, _ := endpoint.ParseEndpoint(ep)
317 target := c.resolverGroup.Target(host)
318 creds := c.dialWithBalancerCreds(ep)
319 return c.dial(target, creds, dopts...)
320}
321
322// dial configures and dials any grpc balancer target.
323func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
324 opts, err := c.dialSetupOpts(creds, dopts...)
325 if err != nil {
326 return nil, fmt.Errorf("failed to configure dialer: %v", err)
327 }
328
329 if c.Username != "" && c.Password != "" {
330 c.authTokenBundle = credentials.NewBundle(credentials.Config{})
331
332 ctx, cancel := c.ctx, func() {}
333 if c.cfg.DialTimeout > 0 {
334 ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
335 }
336
337 err = c.getToken(ctx)
338 if err != nil {
339 if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled {
340 if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
341 err = context.DeadlineExceeded
342 }
343 cancel()
344 return nil, err
345 }
346 } else {
347 opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
348 }
349 cancel()
350 }
351
352 opts = append(opts, c.cfg.DialOptions...)
353
354 dctx := c.ctx
355 if c.cfg.DialTimeout > 0 {
356 var cancel context.CancelFunc
357 dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
358 defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
359 }
360
361 conn, err := grpc.DialContext(dctx, target, opts...)
362 if err != nil {
363 return nil, err
364 }
365 return conn, nil
366}
367
368func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) {
369 _, host, scheme := endpoint.ParseEndpoint(ep)
370 creds := c.creds
371 if len(scheme) != 0 {
372 creds = c.processCreds(scheme)
373 if creds != nil {
374 clone := creds.Clone()
375 // Set the server name must to the endpoint hostname without port since grpc
376 // otherwise attempts to check if x509 cert is valid for the full endpoint
377 // including the scheme and port, which fails.
378 overrideServerName, _, err := net.SplitHostPort(host)
379 if err != nil {
380 // Either the host didn't have a port or the host could not be parsed. Either way, continue with the
381 // original host string.
382 overrideServerName = host
383 }
384 clone.OverrideServerName(overrideServerName)
385 creds = clone
386 }
387 }
388 return creds, nil
389}
390
391func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
392 _, _, scheme := endpoint.ParseEndpoint(ep)
393 creds := c.creds
394 if len(scheme) != 0 {
395 creds = c.processCreds(scheme)
396 }
397 return creds
398}
399
400func newClient(cfg *Config) (*Client, error) {
401 if cfg == nil {
402 cfg = &Config{}
403 }
404 var creds grpccredentials.TransportCredentials
405 if cfg.TLS != nil {
406 creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
407 }
408
409 // use a temporary skeleton client to bootstrap first connection
410 baseCtx := context.TODO()
411 if cfg.Context != nil {
412 baseCtx = cfg.Context
413 }
414
415 ctx, cancel := context.WithCancel(baseCtx)
416 client := &Client{
417 conn: nil,
418 cfg: *cfg,
419 creds: creds,
420 ctx: ctx,
421 cancel: cancel,
422 mu: new(sync.RWMutex),
423 callOpts: defaultCallOpts,
424 }
425
426 lcfg := logutil.DefaultZapLoggerConfig
427 if cfg.LogConfig != nil {
428 lcfg = *cfg.LogConfig
429 }
430 var err error
431 client.lg, err = lcfg.Build()
432 if err != nil {
433 return nil, err
434 }
435
436 if cfg.Username != "" && cfg.Password != "" {
437 client.Username = cfg.Username
438 client.Password = cfg.Password
439 }
440 if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
441 if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
442 return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
443 }
444 callOpts := []grpc.CallOption{
445 defaultFailFast,
446 defaultMaxCallSendMsgSize,
447 defaultMaxCallRecvMsgSize,
448 }
449 if cfg.MaxCallSendMsgSize > 0 {
450 callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
451 }
452 if cfg.MaxCallRecvMsgSize > 0 {
453 callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
454 }
455 client.callOpts = callOpts
456 }
457
458 // Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
459 // to dial so the client knows to use this resolver.
460 client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
461 if err != nil {
462 client.cancel()
463 return nil, err
464 }
465 client.resolverGroup.SetEndpoints(cfg.Endpoints)
466
467 if len(cfg.Endpoints) < 1 {
468 return nil, fmt.Errorf("at least one Endpoint must is required in client config")
469 }
470 dialEndpoint := cfg.Endpoints[0]
471
472 // Use a provided endpoint target so that for https:// without any tls config given, then
473 // grpc will assume the certificate server name is the endpoint host.
474 conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
475 if err != nil {
476 client.cancel()
477 client.resolverGroup.Close()
478 return nil, err
479 }
480 // TODO: With the old grpc balancer interface, we waited until the dial timeout
481 // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
482 client.conn = conn
483
484 client.Cluster = NewCluster(client)
485 client.KV = NewKV(client)
486 client.Lease = NewLease(client)
487 client.Watcher = NewWatcher(client)
488 client.Auth = NewAuth(client)
489 client.Maintenance = NewMaintenance(client)
490
491 if cfg.RejectOldCluster {
492 if err := client.checkVersion(); err != nil {
493 client.Close()
494 return nil, err
495 }
496 }
497
498 go client.autoSync()
499 return client, nil
500}
501
502// roundRobinQuorumBackoff retries against quorum between each backoff.
503// This is intended for use with a round robin load balancer.
504func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
505 return func(attempt uint) time.Duration {
506 // after each round robin across quorum, backoff for our wait between duration
507 n := uint(len(c.Endpoints()))
508 quorum := (n/2 + 1)
509 if attempt%quorum == 0 {
510 c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
511 return jitterUp(waitBetween, jitterFraction)
512 }
513 c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
514 return 0
515 }
516}
517
518func (c *Client) checkVersion() (err error) {
519 var wg sync.WaitGroup
520
521 eps := c.Endpoints()
522 errc := make(chan error, len(eps))
523 ctx, cancel := context.WithCancel(c.ctx)
524 if c.cfg.DialTimeout > 0 {
525 cancel()
526 ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
527 }
528
529 wg.Add(len(eps))
530 for _, ep := range eps {
531 // if cluster is current, any endpoint gives a recent version
532 go func(e string) {
533 defer wg.Done()
534 resp, rerr := c.Status(ctx, e)
535 if rerr != nil {
536 errc <- rerr
537 return
538 }
539 vs := strings.Split(resp.Version, ".")
540 maj, min := 0, 0
541 if len(vs) >= 2 {
542 var serr error
543 if maj, serr = strconv.Atoi(vs[0]); serr != nil {
544 errc <- serr
545 return
546 }
547 if min, serr = strconv.Atoi(vs[1]); serr != nil {
548 errc <- serr
549 return
550 }
551 }
552 if maj < 3 || (maj == 3 && min < 2) {
553 rerr = ErrOldCluster
554 }
555 errc <- rerr
556 }(ep)
557 }
558 // wait for success
559 for range eps {
560 if err = <-errc; err == nil {
561 break
562 }
563 }
564 cancel()
565 wg.Wait()
566 return err
567}
568
569// ActiveConnection returns the current in-use connection
570func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
571
572// isHaltErr returns true if the given error and context indicate no forward
573// progress can be made, even after reconnecting.
574func isHaltErr(ctx context.Context, err error) bool {
575 if ctx != nil && ctx.Err() != nil {
576 return true
577 }
578 if err == nil {
579 return false
580 }
581 ev, _ := status.FromError(err)
582 // Unavailable codes mean the system will be right back.
583 // (e.g., can't connect, lost leader)
584 // Treat Internal codes as if something failed, leaving the
585 // system in an inconsistent state, but retrying could make progress.
586 // (e.g., failed in middle of send, corrupted frame)
587 // TODO: are permanent Internal errors possible from grpc?
588 return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
589}
590
591// isUnavailableErr returns true if the given error is an unavailable error
592func isUnavailableErr(ctx context.Context, err error) bool {
593 if ctx != nil && ctx.Err() != nil {
594 return false
595 }
596 if err == nil {
597 return false
598 }
599 ev, ok := status.FromError(err)
600 if ok {
601 // Unavailable codes mean the system will be right back.
602 // (e.g., can't connect, lost leader)
603 return ev.Code() == codes.Unavailable
604 }
605 return false
606}
607
608func toErr(ctx context.Context, err error) error {
609 if err == nil {
610 return nil
611 }
612 err = rpctypes.Error(err)
613 if _, ok := err.(rpctypes.EtcdError); ok {
614 return err
615 }
616 if ev, ok := status.FromError(err); ok {
617 code := ev.Code()
618 switch code {
619 case codes.DeadlineExceeded:
620 fallthrough
621 case codes.Canceled:
622 if ctx.Err() != nil {
623 err = ctx.Err()
624 }
625 }
626 }
627 return err
628}
629
630func canceledByCaller(stopCtx context.Context, err error) bool {
631 if stopCtx.Err() == nil || err == nil {
632 return false
633 }
634
635 return err == context.Canceled || err == context.DeadlineExceeded
636}
637
638// IsConnCanceled returns true, if error is from a closed gRPC connection.
639// ref. https://github.com/grpc/grpc-go/pull/1854
640func IsConnCanceled(err error) bool {
641 if err == nil {
642 return false
643 }
644
645 // >= gRPC v1.23.x
646 s, ok := status.FromError(err)
647 if ok {
648 // connection is canceled or server has already closed the connection
649 return s.Code() == codes.Canceled || s.Message() == "transport is closing"
650 }
651
652 // >= gRPC v1.10.x
653 if err == context.Canceled {
654 return true
655 }
656
657 // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
658 return strings.Contains(err.Error(), "grpc: the client connection is closing")
659}
660
661// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
662type TransportCredentialsWithDialer interface {
663 grpccredentials.TransportCredentials
664 Dialer(ctx context.Context, dialEp string) (net.Conn, error)
665}