blob: de9ab798e4871df25de845fe0d5fb8f281418b58 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package client
16
17import (
18 "context"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "io/ioutil"
23 "math/rand"
24 "net"
25 "net/http"
26 "net/url"
27 "sort"
28 "strconv"
29 "sync"
30 "time"
31
32 "go.etcd.io/etcd/version"
33)
34
35var (
36 ErrNoEndpoints = errors.New("client: no endpoints available")
37 ErrTooManyRedirects = errors.New("client: too many redirects")
38 ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
39 ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
40 errTooManyRedirectChecks = errors.New("client: too many redirect checks")
41
42 // oneShotCtxValue is set on a context using WithValue(&oneShotValue) so
43 // that Do() will not retry a request
44 oneShotCtxValue interface{}
45)
46
47var DefaultRequestTimeout = 5 * time.Second
48
49var DefaultTransport CancelableTransport = &http.Transport{
50 Proxy: http.ProxyFromEnvironment,
51 Dial: (&net.Dialer{
52 Timeout: 30 * time.Second,
53 KeepAlive: 30 * time.Second,
54 }).Dial,
55 TLSHandshakeTimeout: 10 * time.Second,
56}
57
58type EndpointSelectionMode int
59
60const (
61 // EndpointSelectionRandom is the default value of the 'SelectionMode'.
62 // As the name implies, the client object will pick a node from the members
63 // of the cluster in a random fashion. If the cluster has three members, A, B,
64 // and C, the client picks any node from its three members as its request
65 // destination.
66 EndpointSelectionRandom EndpointSelectionMode = iota
67
68 // If 'SelectionMode' is set to 'EndpointSelectionPrioritizeLeader',
69 // requests are sent directly to the cluster leader. This reduces
70 // forwarding roundtrips compared to making requests to etcd followers
71 // who then forward them to the cluster leader. In the event of a leader
72 // failure, however, clients configured this way cannot prioritize among
73 // the remaining etcd followers. Therefore, when a client sets 'SelectionMode'
74 // to 'EndpointSelectionPrioritizeLeader', it must use 'client.AutoSync()' to
75 // maintain its knowledge of current cluster state.
76 //
77 // This mode should be used with Client.AutoSync().
78 EndpointSelectionPrioritizeLeader
79)
80
81type Config struct {
82 // Endpoints defines a set of URLs (schemes, hosts and ports only)
83 // that can be used to communicate with a logical etcd cluster. For
84 // example, a three-node cluster could be provided like so:
85 //
86 // Endpoints: []string{
87 // "http://node1.example.com:2379",
88 // "http://node2.example.com:2379",
89 // "http://node3.example.com:2379",
90 // }
91 //
92 // If multiple endpoints are provided, the Client will attempt to
93 // use them all in the event that one or more of them are unusable.
94 //
95 // If Client.Sync is ever called, the Client may cache an alternate
96 // set of endpoints to continue operation.
97 Endpoints []string
98
99 // Transport is used by the Client to drive HTTP requests. If not
100 // provided, DefaultTransport will be used.
101 Transport CancelableTransport
102
103 // CheckRedirect specifies the policy for handling HTTP redirects.
104 // If CheckRedirect is not nil, the Client calls it before
105 // following an HTTP redirect. The sole argument is the number of
106 // requests that have already been made. If CheckRedirect returns
107 // an error, Client.Do will not make any further requests and return
108 // the error back it to the caller.
109 //
110 // If CheckRedirect is nil, the Client uses its default policy,
111 // which is to stop after 10 consecutive requests.
112 CheckRedirect CheckRedirectFunc
113
114 // Username specifies the user credential to add as an authorization header
115 Username string
116
117 // Password is the password for the specified user to add as an authorization header
118 // to the request.
119 Password string
120
121 // HeaderTimeoutPerRequest specifies the time limit to wait for response
122 // header in a single request made by the Client. The timeout includes
123 // connection time, any redirects, and header wait time.
124 //
125 // For non-watch GET request, server returns the response body immediately.
126 // For PUT/POST/DELETE request, server will attempt to commit request
127 // before responding, which is expected to take `100ms + 2 * RTT`.
128 // For watch request, server returns the header immediately to notify Client
129 // watch start. But if server is behind some kind of proxy, the response
130 // header may be cached at proxy, and Client cannot rely on this behavior.
131 //
132 // Especially, wait request will ignore this timeout.
133 //
134 // One API call may send multiple requests to different etcd servers until it
135 // succeeds. Use context of the API to specify the overall timeout.
136 //
137 // A HeaderTimeoutPerRequest of zero means no timeout.
138 HeaderTimeoutPerRequest time.Duration
139
140 // SelectionMode is an EndpointSelectionMode enum that specifies the
141 // policy for choosing the etcd cluster node to which requests are sent.
142 SelectionMode EndpointSelectionMode
143}
144
145func (cfg *Config) transport() CancelableTransport {
146 if cfg.Transport == nil {
147 return DefaultTransport
148 }
149 return cfg.Transport
150}
151
152func (cfg *Config) checkRedirect() CheckRedirectFunc {
153 if cfg.CheckRedirect == nil {
154 return DefaultCheckRedirect
155 }
156 return cfg.CheckRedirect
157}
158
159// CancelableTransport mimics net/http.Transport, but requires that
160// the object also support request cancellation.
161type CancelableTransport interface {
162 http.RoundTripper
163 CancelRequest(req *http.Request)
164}
165
166type CheckRedirectFunc func(via int) error
167
168// DefaultCheckRedirect follows up to 10 redirects, but no more.
169var DefaultCheckRedirect CheckRedirectFunc = func(via int) error {
170 if via > 10 {
171 return ErrTooManyRedirects
172 }
173 return nil
174}
175
176type Client interface {
177 // Sync updates the internal cache of the etcd cluster's membership.
178 Sync(context.Context) error
179
180 // AutoSync periodically calls Sync() every given interval.
181 // The recommended sync interval is 10 seconds to 1 minute, which does
182 // not bring too much overhead to server and makes client catch up the
183 // cluster change in time.
184 //
185 // The example to use it:
186 //
187 // for {
188 // err := client.AutoSync(ctx, 10*time.Second)
189 // if err == context.DeadlineExceeded || err == context.Canceled {
190 // break
191 // }
192 // log.Print(err)
193 // }
194 AutoSync(context.Context, time.Duration) error
195
196 // Endpoints returns a copy of the current set of API endpoints used
197 // by Client to resolve HTTP requests. If Sync has ever been called,
198 // this may differ from the initial Endpoints provided in the Config.
199 Endpoints() []string
200
201 // SetEndpoints sets the set of API endpoints used by Client to resolve
202 // HTTP requests. If the given endpoints are not valid, an error will be
203 // returned
204 SetEndpoints(eps []string) error
205
206 // GetVersion retrieves the current etcd server and cluster version
207 GetVersion(ctx context.Context) (*version.Versions, error)
208
209 httpClient
210}
211
212func New(cfg Config) (Client, error) {
213 c := &httpClusterClient{
214 clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest),
215 rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
216 selectionMode: cfg.SelectionMode,
217 }
218 if cfg.Username != "" {
219 c.credentials = &credentials{
220 username: cfg.Username,
221 password: cfg.Password,
222 }
223 }
224 if err := c.SetEndpoints(cfg.Endpoints); err != nil {
225 return nil, err
226 }
227 return c, nil
228}
229
230type httpClient interface {
231 Do(context.Context, httpAction) (*http.Response, []byte, error)
232}
233
234func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc, headerTimeout time.Duration) httpClientFactory {
235 return func(ep url.URL) httpClient {
236 return &redirectFollowingHTTPClient{
237 checkRedirect: cr,
238 client: &simpleHTTPClient{
239 transport: tr,
240 endpoint: ep,
241 headerTimeout: headerTimeout,
242 },
243 }
244 }
245}
246
247type credentials struct {
248 username string
249 password string
250}
251
252type httpClientFactory func(url.URL) httpClient
253
254type httpAction interface {
255 HTTPRequest(url.URL) *http.Request
256}
257
258type httpClusterClient struct {
259 clientFactory httpClientFactory
260 endpoints []url.URL
261 pinned int
262 credentials *credentials
263 sync.RWMutex
264 rand *rand.Rand
265 selectionMode EndpointSelectionMode
266}
267
268func (c *httpClusterClient) getLeaderEndpoint(ctx context.Context, eps []url.URL) (string, error) {
269 ceps := make([]url.URL, len(eps))
270 copy(ceps, eps)
271
272 // To perform a lookup on the new endpoint list without using the current
273 // client, we'll copy it
274 clientCopy := &httpClusterClient{
275 clientFactory: c.clientFactory,
276 credentials: c.credentials,
277 rand: c.rand,
278
279 pinned: 0,
280 endpoints: ceps,
281 }
282
283 mAPI := NewMembersAPI(clientCopy)
284 leader, err := mAPI.Leader(ctx)
285 if err != nil {
286 return "", err
287 }
288 if len(leader.ClientURLs) == 0 {
289 return "", ErrNoLeaderEndpoint
290 }
291
292 return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs?
293}
294
295func (c *httpClusterClient) parseEndpoints(eps []string) ([]url.URL, error) {
296 if len(eps) == 0 {
297 return []url.URL{}, ErrNoEndpoints
298 }
299
300 neps := make([]url.URL, len(eps))
301 for i, ep := range eps {
302 u, err := url.Parse(ep)
303 if err != nil {
304 return []url.URL{}, err
305 }
306 neps[i] = *u
307 }
308 return neps, nil
309}
310
311func (c *httpClusterClient) SetEndpoints(eps []string) error {
312 neps, err := c.parseEndpoints(eps)
313 if err != nil {
314 return err
315 }
316
317 c.Lock()
318 defer c.Unlock()
319
320 c.endpoints = shuffleEndpoints(c.rand, neps)
321 // We're not doing anything for PrioritizeLeader here. This is
322 // due to not having a context meaning we can't call getLeaderEndpoint
323 // However, if you're using PrioritizeLeader, you've already been told
324 // to regularly call sync, where we do have a ctx, and can figure the
325 // leader. PrioritizeLeader is also quite a loose guarantee, so deal
326 // with it
327 c.pinned = 0
328
329 return nil
330}
331
332func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
333 action := act
334 c.RLock()
335 leps := len(c.endpoints)
336 eps := make([]url.URL, leps)
337 n := copy(eps, c.endpoints)
338 pinned := c.pinned
339
340 if c.credentials != nil {
341 action = &authedAction{
342 act: act,
343 credentials: *c.credentials,
344 }
345 }
346 c.RUnlock()
347
348 if leps == 0 {
349 return nil, nil, ErrNoEndpoints
350 }
351
352 if leps != n {
353 return nil, nil, errors.New("unable to pick endpoint: copy failed")
354 }
355
356 var resp *http.Response
357 var body []byte
358 var err error
359 cerr := &ClusterError{}
360 isOneShot := ctx.Value(&oneShotCtxValue) != nil
361
362 for i := pinned; i < leps+pinned; i++ {
363 k := i % leps
364 hc := c.clientFactory(eps[k])
365 resp, body, err = hc.Do(ctx, action)
366 if err != nil {
367 cerr.Errors = append(cerr.Errors, err)
368 if err == ctx.Err() {
369 return nil, nil, ctx.Err()
370 }
371 if err == context.Canceled || err == context.DeadlineExceeded {
372 return nil, nil, err
373 }
374 } else if resp.StatusCode/100 == 5 {
375 switch resp.StatusCode {
376 case http.StatusInternalServerError, http.StatusServiceUnavailable:
377 // TODO: make sure this is a no leader response
378 cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String()))
379 default:
380 cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
381 }
382 err = cerr.Errors[0]
383 }
384 if err != nil {
385 if !isOneShot {
386 continue
387 }
388 c.Lock()
389 c.pinned = (k + 1) % leps
390 c.Unlock()
391 return nil, nil, err
392 }
393 if k != pinned {
394 c.Lock()
395 c.pinned = k
396 c.Unlock()
397 }
398 return resp, body, nil
399 }
400
401 return nil, nil, cerr
402}
403
404func (c *httpClusterClient) Endpoints() []string {
405 c.RLock()
406 defer c.RUnlock()
407
408 eps := make([]string, len(c.endpoints))
409 for i, ep := range c.endpoints {
410 eps[i] = ep.String()
411 }
412
413 return eps
414}
415
416func (c *httpClusterClient) Sync(ctx context.Context) error {
417 mAPI := NewMembersAPI(c)
418 ms, err := mAPI.List(ctx)
419 if err != nil {
420 return err
421 }
422
423 var eps []string
424 for _, m := range ms {
425 eps = append(eps, m.ClientURLs...)
426 }
427
428 neps, err := c.parseEndpoints(eps)
429 if err != nil {
430 return err
431 }
432
433 npin := 0
434
435 switch c.selectionMode {
436 case EndpointSelectionRandom:
437 c.RLock()
438 eq := endpointsEqual(c.endpoints, neps)
439 c.RUnlock()
440
441 if eq {
442 return nil
443 }
444 // When items in the endpoint list changes, we choose a new pin
445 neps = shuffleEndpoints(c.rand, neps)
446 case EndpointSelectionPrioritizeLeader:
447 nle, err := c.getLeaderEndpoint(ctx, neps)
448 if err != nil {
449 return ErrNoLeaderEndpoint
450 }
451
452 for i, n := range neps {
453 if n.String() == nle {
454 npin = i
455 break
456 }
457 }
458 default:
459 return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode)
460 }
461
462 c.Lock()
463 defer c.Unlock()
464 c.endpoints = neps
465 c.pinned = npin
466
467 return nil
468}
469
470func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
471 ticker := time.NewTicker(interval)
472 defer ticker.Stop()
473 for {
474 err := c.Sync(ctx)
475 if err != nil {
476 return err
477 }
478 select {
479 case <-ctx.Done():
480 return ctx.Err()
481 case <-ticker.C:
482 }
483 }
484}
485
486func (c *httpClusterClient) GetVersion(ctx context.Context) (*version.Versions, error) {
487 act := &getAction{Prefix: "/version"}
488
489 resp, body, err := c.Do(ctx, act)
490 if err != nil {
491 return nil, err
492 }
493
494 switch resp.StatusCode {
495 case http.StatusOK:
496 if len(body) == 0 {
497 return nil, ErrEmptyBody
498 }
499 var vresp version.Versions
500 if err := json.Unmarshal(body, &vresp); err != nil {
501 return nil, ErrInvalidJSON
502 }
503 return &vresp, nil
504 default:
505 var etcdErr Error
506 if err := json.Unmarshal(body, &etcdErr); err != nil {
507 return nil, ErrInvalidJSON
508 }
509 return nil, etcdErr
510 }
511}
512
513type roundTripResponse struct {
514 resp *http.Response
515 err error
516}
517
518type simpleHTTPClient struct {
519 transport CancelableTransport
520 endpoint url.URL
521 headerTimeout time.Duration
522}
523
524func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
525 req := act.HTTPRequest(c.endpoint)
526
527 if err := printcURL(req); err != nil {
528 return nil, nil, err
529 }
530
531 isWait := false
532 if req != nil && req.URL != nil {
533 ws := req.URL.Query().Get("wait")
534 if len(ws) != 0 {
535 var err error
536 isWait, err = strconv.ParseBool(ws)
537 if err != nil {
538 return nil, nil, fmt.Errorf("wrong wait value %s (%v for %+v)", ws, err, req)
539 }
540 }
541 }
542
543 var hctx context.Context
544 var hcancel context.CancelFunc
545 if !isWait && c.headerTimeout > 0 {
546 hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
547 } else {
548 hctx, hcancel = context.WithCancel(ctx)
549 }
550 defer hcancel()
551
552 reqcancel := requestCanceler(c.transport, req)
553
554 rtchan := make(chan roundTripResponse, 1)
555 go func() {
556 resp, err := c.transport.RoundTrip(req)
557 rtchan <- roundTripResponse{resp: resp, err: err}
558 close(rtchan)
559 }()
560
561 var resp *http.Response
562 var err error
563
564 select {
565 case rtresp := <-rtchan:
566 resp, err = rtresp.resp, rtresp.err
567 case <-hctx.Done():
568 // cancel and wait for request to actually exit before continuing
569 reqcancel()
570 rtresp := <-rtchan
571 resp = rtresp.resp
572 switch {
573 case ctx.Err() != nil:
574 err = ctx.Err()
575 case hctx.Err() != nil:
576 err = fmt.Errorf("client: endpoint %s exceeded header timeout", c.endpoint.String())
577 default:
578 panic("failed to get error from context")
579 }
580 }
581
582 // always check for resp nil-ness to deal with possible
583 // race conditions between channels above
584 defer func() {
585 if resp != nil {
586 resp.Body.Close()
587 }
588 }()
589
590 if err != nil {
591 return nil, nil, err
592 }
593
594 var body []byte
595 done := make(chan struct{})
596 go func() {
597 body, err = ioutil.ReadAll(resp.Body)
598 done <- struct{}{}
599 }()
600
601 select {
602 case <-ctx.Done():
603 resp.Body.Close()
604 <-done
605 return nil, nil, ctx.Err()
606 case <-done:
607 }
608
609 return resp, body, err
610}
611
612type authedAction struct {
613 act httpAction
614 credentials credentials
615}
616
617func (a *authedAction) HTTPRequest(url url.URL) *http.Request {
618 r := a.act.HTTPRequest(url)
619 r.SetBasicAuth(a.credentials.username, a.credentials.password)
620 return r
621}
622
623type redirectFollowingHTTPClient struct {
624 client httpClient
625 checkRedirect CheckRedirectFunc
626}
627
628func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
629 next := act
630 for i := 0; i < 100; i++ {
631 if i > 0 {
632 if err := r.checkRedirect(i); err != nil {
633 return nil, nil, err
634 }
635 }
636 resp, body, err := r.client.Do(ctx, next)
637 if err != nil {
638 return nil, nil, err
639 }
640 if resp.StatusCode/100 == 3 {
641 hdr := resp.Header.Get("Location")
642 if hdr == "" {
643 return nil, nil, fmt.Errorf("location header not set")
644 }
645 loc, err := url.Parse(hdr)
646 if err != nil {
647 return nil, nil, fmt.Errorf("location header not valid URL: %s", hdr)
648 }
649 next = &redirectedHTTPAction{
650 action: act,
651 location: *loc,
652 }
653 continue
654 }
655 return resp, body, nil
656 }
657
658 return nil, nil, errTooManyRedirectChecks
659}
660
661type redirectedHTTPAction struct {
662 action httpAction
663 location url.URL
664}
665
666func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
667 orig := r.action.HTTPRequest(ep)
668 orig.URL = &r.location
669 return orig
670}
671
672func shuffleEndpoints(r *rand.Rand, eps []url.URL) []url.URL {
673 // copied from Go 1.9<= rand.Rand.Perm
674 n := len(eps)
675 p := make([]int, n)
676 for i := 0; i < n; i++ {
677 j := r.Intn(i + 1)
678 p[i] = p[j]
679 p[j] = i
680 }
681 neps := make([]url.URL, n)
682 for i, k := range p {
683 neps[i] = eps[k]
684 }
685 return neps
686}
687
688func endpointsEqual(left, right []url.URL) bool {
689 if len(left) != len(right) {
690 return false
691 }
692
693 sLeft := make([]string, len(left))
694 sRight := make([]string, len(right))
695 for i, l := range left {
696 sLeft[i] = l.String()
697 }
698 for i, r := range right {
699 sRight[i] = r.String()
700 }
701
702 sort.Strings(sLeft)
703 sort.Strings(sRight)
704 for i := range sLeft {
705 if sLeft[i] != sRight[i] {
706 return false
707 }
708 }
709 return true
710}