blob: b91cbf958ef850f0039199c99a97077e541ccf9d [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "crypto/tls"
20 "errors"
21 "fmt"
22 "net"
23 "os"
24 "strconv"
25 "strings"
26 "sync"
27 "time"
28
29 "github.com/google/uuid"
30 "go.etcd.io/etcd/clientv3/balancer"
31 "go.etcd.io/etcd/clientv3/balancer/picker"
32 "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
33 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
34 "go.etcd.io/etcd/pkg/logutil"
35 "go.uber.org/zap"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/keepalive"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/status"
42)
43
44var (
45 ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
46 ErrOldCluster = errors.New("etcdclient: old cluster version")
47
48 roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
49)
50
51func init() {
52 lg := zap.NewNop()
53 if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
54 var err error
55 lg, err = zap.NewProductionConfig().Build() // info level logging
56 if err != nil {
57 panic(err)
58 }
59 }
60 balancer.RegisterBuilder(balancer.Config{
61 Policy: picker.RoundrobinBalanced,
62 Name: roundRobinBalancerName,
63 Logger: lg,
64 })
65}
66
67// Client provides and manages an etcd v3 client session.
68type Client struct {
69 Cluster
70 KV
71 Lease
72 Watcher
73 Auth
74 Maintenance
75
76 conn *grpc.ClientConn
77
78 cfg Config
79 creds *credentials.TransportCredentials
William Kurkianea869482019-04-09 15:16:11 -040080 resolverGroup *endpoint.ResolverGroup
Abhilash S.L3b494632019-07-16 15:51:09 +053081 mu *sync.RWMutex
William Kurkianea869482019-04-09 15:16:11 -040082
83 ctx context.Context
84 cancel context.CancelFunc
85
86 // Username is a user name for authentication.
87 Username string
88 // Password is a password for authentication.
89 Password string
90 // tokenCred is an instance of WithPerRPCCredentials()'s argument
91 tokenCred *authTokenCredential
92
93 callOpts []grpc.CallOption
94
95 lg *zap.Logger
96}
97
98// New creates a new etcdv3 client from a given configuration.
99func New(cfg Config) (*Client, error) {
100 if len(cfg.Endpoints) == 0 {
101 return nil, ErrNoAvailableEndpoints
102 }
103
104 return newClient(&cfg)
105}
106
107// NewCtxClient creates a client with a context but no underlying grpc
108// connection. This is useful for embedded cases that override the
109// service interface implementations and do not need connection management.
110func NewCtxClient(ctx context.Context) *Client {
111 cctx, cancel := context.WithCancel(ctx)
112 return &Client{ctx: cctx, cancel: cancel}
113}
114
115// NewFromURL creates a new etcdv3 client from a URL.
116func NewFromURL(url string) (*Client, error) {
117 return New(Config{Endpoints: []string{url}})
118}
119
120// NewFromURLs creates a new etcdv3 client from URLs.
121func NewFromURLs(urls []string) (*Client, error) {
122 return New(Config{Endpoints: urls})
123}
124
125// Close shuts down the client's etcd connections.
126func (c *Client) Close() error {
127 c.cancel()
128 c.Watcher.Close()
129 c.Lease.Close()
130 if c.resolverGroup != nil {
131 c.resolverGroup.Close()
132 }
133 if c.conn != nil {
134 return toErr(c.ctx, c.conn.Close())
135 }
136 return c.ctx.Err()
137}
138
139// Ctx is a context for "out of band" messages (e.g., for sending
140// "clean up" message when another context is canceled). It is
141// canceled on client Close().
142func (c *Client) Ctx() context.Context { return c.ctx }
143
144// Endpoints lists the registered endpoints for the client.
Abhilash S.L3b494632019-07-16 15:51:09 +0530145func (c *Client) Endpoints() []string {
William Kurkianea869482019-04-09 15:16:11 -0400146 // copy the slice; protect original endpoints from being changed
Abhilash S.L3b494632019-07-16 15:51:09 +0530147 c.mu.RLock()
148 defer c.mu.RUnlock()
149 eps := make([]string, len(c.cfg.Endpoints))
William Kurkianea869482019-04-09 15:16:11 -0400150 copy(eps, c.cfg.Endpoints)
Abhilash S.L3b494632019-07-16 15:51:09 +0530151 return eps
William Kurkianea869482019-04-09 15:16:11 -0400152}
153
154// SetEndpoints updates client's endpoints.
155func (c *Client) SetEndpoints(eps ...string) {
156 c.mu.Lock()
157 defer c.mu.Unlock()
158 c.cfg.Endpoints = eps
159 c.resolverGroup.SetEndpoints(eps)
160}
161
162// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
163func (c *Client) Sync(ctx context.Context) error {
164 mresp, err := c.MemberList(ctx)
165 if err != nil {
166 return err
167 }
168 var eps []string
169 for _, m := range mresp.Members {
170 eps = append(eps, m.ClientURLs...)
171 }
172 c.SetEndpoints(eps...)
173 return nil
174}
175
176func (c *Client) autoSync() {
177 if c.cfg.AutoSyncInterval == time.Duration(0) {
178 return
179 }
180
181 for {
182 select {
183 case <-c.ctx.Done():
184 return
185 case <-time.After(c.cfg.AutoSyncInterval):
186 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
187 err := c.Sync(ctx)
188 cancel()
189 if err != nil && err != c.ctx.Err() {
190 lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
191 }
192 }
193 }
194}
195
196type authTokenCredential struct {
197 token string
198 tokenMu *sync.RWMutex
199}
200
201func (cred authTokenCredential) RequireTransportSecurity() bool {
202 return false
203}
204
205func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
206 cred.tokenMu.RLock()
207 defer cred.tokenMu.RUnlock()
208 return map[string]string{
209 rpctypes.TokenFieldNameGRPC: cred.token,
210 }, nil
211}
212
213func (c *Client) processCreds(scheme string) (creds *credentials.TransportCredentials) {
214 creds = c.creds
215 switch scheme {
216 case "unix":
217 case "http":
218 creds = nil
219 case "https", "unixs":
220 if creds != nil {
221 break
222 }
223 tlsconfig := &tls.Config{}
224 emptyCreds := credentials.NewTLS(tlsconfig)
225 creds = &emptyCreds
226 default:
227 creds = nil
228 }
229 return creds
230}
231
232// dialSetupOpts gives the dial opts prior to any authentication.
233func (c *Client) dialSetupOpts(creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
234 if c.cfg.DialKeepAliveTime > 0 {
235 params := keepalive.ClientParameters{
236 Time: c.cfg.DialKeepAliveTime,
237 Timeout: c.cfg.DialKeepAliveTimeout,
238 PermitWithoutStream: c.cfg.PermitWithoutStream,
239 }
240 opts = append(opts, grpc.WithKeepaliveParams(params))
241 }
242 opts = append(opts, dopts...)
243
244 // Provide a net dialer that supports cancelation and timeout.
245 f := func(dialEp string, t time.Duration) (net.Conn, error) {
246 proto, host, _ := endpoint.ParseEndpoint(dialEp)
247 select {
248 case <-c.ctx.Done():
249 return nil, c.ctx.Err()
250 default:
251 }
252 dialer := &net.Dialer{Timeout: t}
253 return dialer.DialContext(c.ctx, proto, host)
254 }
255 opts = append(opts, grpc.WithDialer(f))
256
257 if creds != nil {
258 opts = append(opts, grpc.WithTransportCredentials(*creds))
259 } else {
260 opts = append(opts, grpc.WithInsecure())
261 }
262
263 // Interceptor retry and backoff.
264 // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
265 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
266 // once it is available.
267 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
268 opts = append(opts,
269 // Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
270 // Streams that are safe to retry are enabled individually.
271 grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
272 grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
273 )
274
275 return opts, nil
276}
277
278// Dial connects to a single endpoint using the client's config.
279func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
280 creds := c.directDialCreds(ep)
281 // Use the grpc passthrough resolver to directly dial a single endpoint.
282 // This resolver passes through the 'unix' and 'unixs' endpoints schemes used
283 // by etcd without modification, allowing us to directly dial endpoints and
284 // using the same dial functions that we use for load balancer dialing.
285 return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
286}
287
288func (c *Client) getToken(ctx context.Context) error {
289 var err error // return last error in a case of fail
290 var auth *authenticator
291
292 for i := 0; i < len(c.cfg.Endpoints); i++ {
293 ep := c.cfg.Endpoints[i]
294 // use dial options without dopts to avoid reusing the client balancer
295 var dOpts []grpc.DialOption
296 _, host, _ := endpoint.ParseEndpoint(ep)
297 target := c.resolverGroup.Target(host)
298 creds := c.dialWithBalancerCreds(ep)
299 dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...)
300 if err != nil {
301 err = fmt.Errorf("failed to configure auth dialer: %v", err)
302 continue
303 }
304 dOpts = append(dOpts, grpc.WithBalancerName(roundRobinBalancerName))
305 auth, err = newAuthenticator(ctx, target, dOpts, c)
306 if err != nil {
307 continue
308 }
309 defer auth.close()
310
311 var resp *AuthenticateResponse
312 resp, err = auth.authenticate(ctx, c.Username, c.Password)
313 if err != nil {
314 // return err without retrying other endpoints
315 if err == rpctypes.ErrAuthNotEnabled {
316 return err
317 }
318 continue
319 }
320
321 c.tokenCred.tokenMu.Lock()
322 c.tokenCred.token = resp.Token
323 c.tokenCred.tokenMu.Unlock()
324
325 return nil
326 }
327
328 return err
329}
330
331// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
332// of the provided endpoint determines the scheme used for all endpoints of the client connection.
333func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
334 _, host, _ := endpoint.ParseEndpoint(ep)
335 target := c.resolverGroup.Target(host)
336 creds := c.dialWithBalancerCreds(ep)
337 return c.dial(target, creds, dopts...)
338}
339
340// dial configures and dials any grpc balancer target.
341func (c *Client) dial(target string, creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
342 opts, err := c.dialSetupOpts(creds, dopts...)
343 if err != nil {
344 return nil, fmt.Errorf("failed to configure dialer: %v", err)
345 }
346
347 if c.Username != "" && c.Password != "" {
348 c.tokenCred = &authTokenCredential{
349 tokenMu: &sync.RWMutex{},
350 }
351
352 ctx, cancel := c.ctx, func() {}
353 if c.cfg.DialTimeout > 0 {
354 ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
355 }
356
357 err = c.getToken(ctx)
358 if err != nil {
359 if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled {
360 if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
361 err = context.DeadlineExceeded
362 }
363 cancel()
364 return nil, err
365 }
366 } else {
367 opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred))
368 }
369 cancel()
370 }
371
372 opts = append(opts, c.cfg.DialOptions...)
373
374 dctx := c.ctx
375 if c.cfg.DialTimeout > 0 {
376 var cancel context.CancelFunc
377 dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
378 defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
379 }
380
381 conn, err := grpc.DialContext(dctx, target, opts...)
382 if err != nil {
383 return nil, err
384 }
385 return conn, nil
386}
387
388func (c *Client) directDialCreds(ep string) *credentials.TransportCredentials {
389 _, hostPort, scheme := endpoint.ParseEndpoint(ep)
390 creds := c.creds
391 if len(scheme) != 0 {
392 creds = c.processCreds(scheme)
393 if creds != nil {
394 c := *creds
395 clone := c.Clone()
396 // Set the server name must to the endpoint hostname without port since grpc
397 // otherwise attempts to check if x509 cert is valid for the full endpoint
398 // including the scheme and port, which fails.
399 host, _ := endpoint.ParseHostPort(hostPort)
400 clone.OverrideServerName(host)
401 creds = &clone
402 }
403 }
404 return creds
405}
406
407func (c *Client) dialWithBalancerCreds(ep string) *credentials.TransportCredentials {
408 _, _, scheme := endpoint.ParseEndpoint(ep)
409 creds := c.creds
410 if len(scheme) != 0 {
411 creds = c.processCreds(scheme)
412 }
413 return creds
414}
415
416// WithRequireLeader requires client requests to only succeed
417// when the cluster has a leader.
418func WithRequireLeader(ctx context.Context) context.Context {
419 md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
420 return metadata.NewOutgoingContext(ctx, md)
421}
422
423func newClient(cfg *Config) (*Client, error) {
424 if cfg == nil {
425 cfg = &Config{}
426 }
427 var creds *credentials.TransportCredentials
428 if cfg.TLS != nil {
429 c := credentials.NewTLS(cfg.TLS)
430 creds = &c
431 }
432
433 // use a temporary skeleton client to bootstrap first connection
434 baseCtx := context.TODO()
435 if cfg.Context != nil {
436 baseCtx = cfg.Context
437 }
438
439 ctx, cancel := context.WithCancel(baseCtx)
440 client := &Client{
441 conn: nil,
442 cfg: *cfg,
443 creds: creds,
444 ctx: ctx,
445 cancel: cancel,
Abhilash S.L3b494632019-07-16 15:51:09 +0530446 mu: new(sync.RWMutex),
William Kurkianea869482019-04-09 15:16:11 -0400447 callOpts: defaultCallOpts,
448 }
449
450 lcfg := logutil.DefaultZapLoggerConfig
451 if cfg.LogConfig != nil {
452 lcfg = *cfg.LogConfig
453 }
454 var err error
455 client.lg, err = lcfg.Build()
456 if err != nil {
457 return nil, err
458 }
459
460 if cfg.Username != "" && cfg.Password != "" {
461 client.Username = cfg.Username
462 client.Password = cfg.Password
463 }
464 if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
465 if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
466 return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
467 }
468 callOpts := []grpc.CallOption{
469 defaultFailFast,
470 defaultMaxCallSendMsgSize,
471 defaultMaxCallRecvMsgSize,
472 }
473 if cfg.MaxCallSendMsgSize > 0 {
474 callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
475 }
476 if cfg.MaxCallRecvMsgSize > 0 {
477 callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
478 }
479 client.callOpts = callOpts
480 }
481
482 // Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
483 // to dial so the client knows to use this resolver.
484 client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
485 if err != nil {
486 client.cancel()
487 return nil, err
488 }
489 client.resolverGroup.SetEndpoints(cfg.Endpoints)
490
491 if len(cfg.Endpoints) < 1 {
492 return nil, fmt.Errorf("at least one Endpoint must is required in client config")
493 }
494 dialEndpoint := cfg.Endpoints[0]
495
496 // Use a provided endpoint target so that for https:// without any tls config given, then
497 // grpc will assume the certificate server name is the endpoint host.
498 conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
499 if err != nil {
500 client.cancel()
501 client.resolverGroup.Close()
502 return nil, err
503 }
504 // TODO: With the old grpc balancer interface, we waited until the dial timeout
505 // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
506 client.conn = conn
507
508 client.Cluster = NewCluster(client)
509 client.KV = NewKV(client)
510 client.Lease = NewLease(client)
511 client.Watcher = NewWatcher(client)
512 client.Auth = NewAuth(client)
513 client.Maintenance = NewMaintenance(client)
514
515 if cfg.RejectOldCluster {
516 if err := client.checkVersion(); err != nil {
517 client.Close()
518 return nil, err
519 }
520 }
521
522 go client.autoSync()
523 return client, nil
524}
525
526// roundRobinQuorumBackoff retries against quorum between each backoff.
527// This is intended for use with a round robin load balancer.
528func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
529 return func(attempt uint) time.Duration {
530 // after each round robin across quorum, backoff for our wait between duration
531 n := uint(len(c.Endpoints()))
532 quorum := (n/2 + 1)
533 if attempt%quorum == 0 {
534 c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
535 return jitterUp(waitBetween, jitterFraction)
536 }
537 c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
538 return 0
539 }
540}
541
542func (c *Client) checkVersion() (err error) {
543 var wg sync.WaitGroup
544 errc := make(chan error, len(c.cfg.Endpoints))
545 ctx, cancel := context.WithCancel(c.ctx)
546 if c.cfg.DialTimeout > 0 {
547 ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
548 }
549 wg.Add(len(c.cfg.Endpoints))
550 for _, ep := range c.cfg.Endpoints {
551 // if cluster is current, any endpoint gives a recent version
552 go func(e string) {
553 defer wg.Done()
554 resp, rerr := c.Status(ctx, e)
555 if rerr != nil {
556 errc <- rerr
557 return
558 }
559 vs := strings.Split(resp.Version, ".")
560 maj, min := 0, 0
561 if len(vs) >= 2 {
562 maj, _ = strconv.Atoi(vs[0])
563 min, rerr = strconv.Atoi(vs[1])
564 }
565 if maj < 3 || (maj == 3 && min < 2) {
566 rerr = ErrOldCluster
567 }
568 errc <- rerr
569 }(ep)
570 }
571 // wait for success
572 for i := 0; i < len(c.cfg.Endpoints); i++ {
573 if err = <-errc; err == nil {
574 break
575 }
576 }
577 cancel()
578 wg.Wait()
579 return err
580}
581
582// ActiveConnection returns the current in-use connection
583func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
584
585// isHaltErr returns true if the given error and context indicate no forward
586// progress can be made, even after reconnecting.
587func isHaltErr(ctx context.Context, err error) bool {
588 if ctx != nil && ctx.Err() != nil {
589 return true
590 }
591 if err == nil {
592 return false
593 }
594 ev, _ := status.FromError(err)
595 // Unavailable codes mean the system will be right back.
596 // (e.g., can't connect, lost leader)
597 // Treat Internal codes as if something failed, leaving the
598 // system in an inconsistent state, but retrying could make progress.
599 // (e.g., failed in middle of send, corrupted frame)
600 // TODO: are permanent Internal errors possible from grpc?
601 return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
602}
603
604// isUnavailableErr returns true if the given error is an unavailable error
605func isUnavailableErr(ctx context.Context, err error) bool {
606 if ctx != nil && ctx.Err() != nil {
607 return false
608 }
609 if err == nil {
610 return false
611 }
612 ev, _ := status.FromError(err)
613 // Unavailable codes mean the system will be right back.
614 // (e.g., can't connect, lost leader)
615 return ev.Code() == codes.Unavailable
616}
617
618func toErr(ctx context.Context, err error) error {
619 if err == nil {
620 return nil
621 }
622 err = rpctypes.Error(err)
623 if _, ok := err.(rpctypes.EtcdError); ok {
624 return err
625 }
626 if ev, ok := status.FromError(err); ok {
627 code := ev.Code()
628 switch code {
629 case codes.DeadlineExceeded:
630 fallthrough
631 case codes.Canceled:
632 if ctx.Err() != nil {
633 err = ctx.Err()
634 }
635 case codes.Unavailable:
636 case codes.FailedPrecondition:
637 err = grpc.ErrClientConnClosing
638 }
639 }
640 return err
641}
642
643func canceledByCaller(stopCtx context.Context, err error) bool {
644 if stopCtx.Err() == nil || err == nil {
645 return false
646 }
647
648 return err == context.Canceled || err == context.DeadlineExceeded
649}
650
651// IsConnCanceled returns true, if error is from a closed gRPC connection.
652// ref. https://github.com/grpc/grpc-go/pull/1854
653func IsConnCanceled(err error) bool {
654 if err == nil {
655 return false
656 }
657 // >= gRPC v1.10.x
658 s, ok := status.FromError(err)
659 if ok {
660 // connection is canceled or server has already closed the connection
661 return s.Code() == codes.Canceled || s.Message() == "transport is closing"
662 }
663 // >= gRPC v1.10.x
664 if err == context.Canceled {
665 return true
666 }
667 // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
668 return strings.Contains(err.Error(), "grpc: the client connection is closing")
669}