khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package client |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "encoding/json" |
| 20 | "errors" |
| 21 | "fmt" |
| 22 | "io/ioutil" |
| 23 | "math/rand" |
| 24 | "net" |
| 25 | "net/http" |
| 26 | "net/url" |
| 27 | "sort" |
| 28 | "strconv" |
| 29 | "sync" |
| 30 | "time" |
| 31 | |
| 32 | "go.etcd.io/etcd/version" |
| 33 | ) |
| 34 | |
| 35 | var ( |
| 36 | ErrNoEndpoints = errors.New("client: no endpoints available") |
| 37 | ErrTooManyRedirects = errors.New("client: too many redirects") |
| 38 | ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured") |
| 39 | ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available") |
| 40 | errTooManyRedirectChecks = errors.New("client: too many redirect checks") |
| 41 | |
| 42 | // oneShotCtxValue is set on a context using WithValue(&oneShotValue) so |
| 43 | // that Do() will not retry a request |
| 44 | oneShotCtxValue interface{} |
| 45 | ) |
| 46 | |
| 47 | var DefaultRequestTimeout = 5 * time.Second |
| 48 | |
| 49 | var DefaultTransport CancelableTransport = &http.Transport{ |
| 50 | Proxy: http.ProxyFromEnvironment, |
| 51 | Dial: (&net.Dialer{ |
| 52 | Timeout: 30 * time.Second, |
| 53 | KeepAlive: 30 * time.Second, |
| 54 | }).Dial, |
| 55 | TLSHandshakeTimeout: 10 * time.Second, |
| 56 | } |
| 57 | |
| 58 | type EndpointSelectionMode int |
| 59 | |
| 60 | const ( |
| 61 | // EndpointSelectionRandom is the default value of the 'SelectionMode'. |
| 62 | // As the name implies, the client object will pick a node from the members |
| 63 | // of the cluster in a random fashion. If the cluster has three members, A, B, |
| 64 | // and C, the client picks any node from its three members as its request |
| 65 | // destination. |
| 66 | EndpointSelectionRandom EndpointSelectionMode = iota |
| 67 | |
| 68 | // If 'SelectionMode' is set to 'EndpointSelectionPrioritizeLeader', |
| 69 | // requests are sent directly to the cluster leader. This reduces |
| 70 | // forwarding roundtrips compared to making requests to etcd followers |
| 71 | // who then forward them to the cluster leader. In the event of a leader |
| 72 | // failure, however, clients configured this way cannot prioritize among |
| 73 | // the remaining etcd followers. Therefore, when a client sets 'SelectionMode' |
| 74 | // to 'EndpointSelectionPrioritizeLeader', it must use 'client.AutoSync()' to |
| 75 | // maintain its knowledge of current cluster state. |
| 76 | // |
| 77 | // This mode should be used with Client.AutoSync(). |
| 78 | EndpointSelectionPrioritizeLeader |
| 79 | ) |
| 80 | |
| 81 | type Config struct { |
| 82 | // Endpoints defines a set of URLs (schemes, hosts and ports only) |
| 83 | // that can be used to communicate with a logical etcd cluster. For |
| 84 | // example, a three-node cluster could be provided like so: |
| 85 | // |
| 86 | // Endpoints: []string{ |
| 87 | // "http://node1.example.com:2379", |
| 88 | // "http://node2.example.com:2379", |
| 89 | // "http://node3.example.com:2379", |
| 90 | // } |
| 91 | // |
| 92 | // If multiple endpoints are provided, the Client will attempt to |
| 93 | // use them all in the event that one or more of them are unusable. |
| 94 | // |
| 95 | // If Client.Sync is ever called, the Client may cache an alternate |
| 96 | // set of endpoints to continue operation. |
| 97 | Endpoints []string |
| 98 | |
| 99 | // Transport is used by the Client to drive HTTP requests. If not |
| 100 | // provided, DefaultTransport will be used. |
| 101 | Transport CancelableTransport |
| 102 | |
| 103 | // CheckRedirect specifies the policy for handling HTTP redirects. |
| 104 | // If CheckRedirect is not nil, the Client calls it before |
| 105 | // following an HTTP redirect. The sole argument is the number of |
| 106 | // requests that have already been made. If CheckRedirect returns |
| 107 | // an error, Client.Do will not make any further requests and return |
| 108 | // the error back it to the caller. |
| 109 | // |
| 110 | // If CheckRedirect is nil, the Client uses its default policy, |
| 111 | // which is to stop after 10 consecutive requests. |
| 112 | CheckRedirect CheckRedirectFunc |
| 113 | |
| 114 | // Username specifies the user credential to add as an authorization header |
| 115 | Username string |
| 116 | |
| 117 | // Password is the password for the specified user to add as an authorization header |
| 118 | // to the request. |
| 119 | Password string |
| 120 | |
| 121 | // HeaderTimeoutPerRequest specifies the time limit to wait for response |
| 122 | // header in a single request made by the Client. The timeout includes |
| 123 | // connection time, any redirects, and header wait time. |
| 124 | // |
| 125 | // For non-watch GET request, server returns the response body immediately. |
| 126 | // For PUT/POST/DELETE request, server will attempt to commit request |
| 127 | // before responding, which is expected to take `100ms + 2 * RTT`. |
| 128 | // For watch request, server returns the header immediately to notify Client |
| 129 | // watch start. But if server is behind some kind of proxy, the response |
| 130 | // header may be cached at proxy, and Client cannot rely on this behavior. |
| 131 | // |
| 132 | // Especially, wait request will ignore this timeout. |
| 133 | // |
| 134 | // One API call may send multiple requests to different etcd servers until it |
| 135 | // succeeds. Use context of the API to specify the overall timeout. |
| 136 | // |
| 137 | // A HeaderTimeoutPerRequest of zero means no timeout. |
| 138 | HeaderTimeoutPerRequest time.Duration |
| 139 | |
| 140 | // SelectionMode is an EndpointSelectionMode enum that specifies the |
| 141 | // policy for choosing the etcd cluster node to which requests are sent. |
| 142 | SelectionMode EndpointSelectionMode |
| 143 | } |
| 144 | |
| 145 | func (cfg *Config) transport() CancelableTransport { |
| 146 | if cfg.Transport == nil { |
| 147 | return DefaultTransport |
| 148 | } |
| 149 | return cfg.Transport |
| 150 | } |
| 151 | |
| 152 | func (cfg *Config) checkRedirect() CheckRedirectFunc { |
| 153 | if cfg.CheckRedirect == nil { |
| 154 | return DefaultCheckRedirect |
| 155 | } |
| 156 | return cfg.CheckRedirect |
| 157 | } |
| 158 | |
| 159 | // CancelableTransport mimics net/http.Transport, but requires that |
| 160 | // the object also support request cancellation. |
| 161 | type CancelableTransport interface { |
| 162 | http.RoundTripper |
| 163 | CancelRequest(req *http.Request) |
| 164 | } |
| 165 | |
| 166 | type CheckRedirectFunc func(via int) error |
| 167 | |
| 168 | // DefaultCheckRedirect follows up to 10 redirects, but no more. |
| 169 | var DefaultCheckRedirect CheckRedirectFunc = func(via int) error { |
| 170 | if via > 10 { |
| 171 | return ErrTooManyRedirects |
| 172 | } |
| 173 | return nil |
| 174 | } |
| 175 | |
| 176 | type Client interface { |
| 177 | // Sync updates the internal cache of the etcd cluster's membership. |
| 178 | Sync(context.Context) error |
| 179 | |
| 180 | // AutoSync periodically calls Sync() every given interval. |
| 181 | // The recommended sync interval is 10 seconds to 1 minute, which does |
| 182 | // not bring too much overhead to server and makes client catch up the |
| 183 | // cluster change in time. |
| 184 | // |
| 185 | // The example to use it: |
| 186 | // |
| 187 | // for { |
| 188 | // err := client.AutoSync(ctx, 10*time.Second) |
| 189 | // if err == context.DeadlineExceeded || err == context.Canceled { |
| 190 | // break |
| 191 | // } |
| 192 | // log.Print(err) |
| 193 | // } |
| 194 | AutoSync(context.Context, time.Duration) error |
| 195 | |
| 196 | // Endpoints returns a copy of the current set of API endpoints used |
| 197 | // by Client to resolve HTTP requests. If Sync has ever been called, |
| 198 | // this may differ from the initial Endpoints provided in the Config. |
| 199 | Endpoints() []string |
| 200 | |
| 201 | // SetEndpoints sets the set of API endpoints used by Client to resolve |
| 202 | // HTTP requests. If the given endpoints are not valid, an error will be |
| 203 | // returned |
| 204 | SetEndpoints(eps []string) error |
| 205 | |
| 206 | // GetVersion retrieves the current etcd server and cluster version |
| 207 | GetVersion(ctx context.Context) (*version.Versions, error) |
| 208 | |
| 209 | httpClient |
| 210 | } |
| 211 | |
| 212 | func New(cfg Config) (Client, error) { |
| 213 | c := &httpClusterClient{ |
| 214 | clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest), |
| 215 | rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))), |
| 216 | selectionMode: cfg.SelectionMode, |
| 217 | } |
| 218 | if cfg.Username != "" { |
| 219 | c.credentials = &credentials{ |
| 220 | username: cfg.Username, |
| 221 | password: cfg.Password, |
| 222 | } |
| 223 | } |
| 224 | if err := c.SetEndpoints(cfg.Endpoints); err != nil { |
| 225 | return nil, err |
| 226 | } |
| 227 | return c, nil |
| 228 | } |
| 229 | |
| 230 | type httpClient interface { |
| 231 | Do(context.Context, httpAction) (*http.Response, []byte, error) |
| 232 | } |
| 233 | |
| 234 | func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc, headerTimeout time.Duration) httpClientFactory { |
| 235 | return func(ep url.URL) httpClient { |
| 236 | return &redirectFollowingHTTPClient{ |
| 237 | checkRedirect: cr, |
| 238 | client: &simpleHTTPClient{ |
| 239 | transport: tr, |
| 240 | endpoint: ep, |
| 241 | headerTimeout: headerTimeout, |
| 242 | }, |
| 243 | } |
| 244 | } |
| 245 | } |
| 246 | |
| 247 | type credentials struct { |
| 248 | username string |
| 249 | password string |
| 250 | } |
| 251 | |
| 252 | type httpClientFactory func(url.URL) httpClient |
| 253 | |
| 254 | type httpAction interface { |
| 255 | HTTPRequest(url.URL) *http.Request |
| 256 | } |
| 257 | |
| 258 | type httpClusterClient struct { |
| 259 | clientFactory httpClientFactory |
| 260 | endpoints []url.URL |
| 261 | pinned int |
| 262 | credentials *credentials |
| 263 | sync.RWMutex |
| 264 | rand *rand.Rand |
| 265 | selectionMode EndpointSelectionMode |
| 266 | } |
| 267 | |
| 268 | func (c *httpClusterClient) getLeaderEndpoint(ctx context.Context, eps []url.URL) (string, error) { |
| 269 | ceps := make([]url.URL, len(eps)) |
| 270 | copy(ceps, eps) |
| 271 | |
| 272 | // To perform a lookup on the new endpoint list without using the current |
| 273 | // client, we'll copy it |
| 274 | clientCopy := &httpClusterClient{ |
| 275 | clientFactory: c.clientFactory, |
| 276 | credentials: c.credentials, |
| 277 | rand: c.rand, |
| 278 | |
| 279 | pinned: 0, |
| 280 | endpoints: ceps, |
| 281 | } |
| 282 | |
| 283 | mAPI := NewMembersAPI(clientCopy) |
| 284 | leader, err := mAPI.Leader(ctx) |
| 285 | if err != nil { |
| 286 | return "", err |
| 287 | } |
| 288 | if len(leader.ClientURLs) == 0 { |
| 289 | return "", ErrNoLeaderEndpoint |
| 290 | } |
| 291 | |
| 292 | return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs? |
| 293 | } |
| 294 | |
| 295 | func (c *httpClusterClient) parseEndpoints(eps []string) ([]url.URL, error) { |
| 296 | if len(eps) == 0 { |
| 297 | return []url.URL{}, ErrNoEndpoints |
| 298 | } |
| 299 | |
| 300 | neps := make([]url.URL, len(eps)) |
| 301 | for i, ep := range eps { |
| 302 | u, err := url.Parse(ep) |
| 303 | if err != nil { |
| 304 | return []url.URL{}, err |
| 305 | } |
| 306 | neps[i] = *u |
| 307 | } |
| 308 | return neps, nil |
| 309 | } |
| 310 | |
| 311 | func (c *httpClusterClient) SetEndpoints(eps []string) error { |
| 312 | neps, err := c.parseEndpoints(eps) |
| 313 | if err != nil { |
| 314 | return err |
| 315 | } |
| 316 | |
| 317 | c.Lock() |
| 318 | defer c.Unlock() |
| 319 | |
| 320 | c.endpoints = shuffleEndpoints(c.rand, neps) |
| 321 | // We're not doing anything for PrioritizeLeader here. This is |
| 322 | // due to not having a context meaning we can't call getLeaderEndpoint |
| 323 | // However, if you're using PrioritizeLeader, you've already been told |
| 324 | // to regularly call sync, where we do have a ctx, and can figure the |
| 325 | // leader. PrioritizeLeader is also quite a loose guarantee, so deal |
| 326 | // with it |
| 327 | c.pinned = 0 |
| 328 | |
| 329 | return nil |
| 330 | } |
| 331 | |
| 332 | func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { |
| 333 | action := act |
| 334 | c.RLock() |
| 335 | leps := len(c.endpoints) |
| 336 | eps := make([]url.URL, leps) |
| 337 | n := copy(eps, c.endpoints) |
| 338 | pinned := c.pinned |
| 339 | |
| 340 | if c.credentials != nil { |
| 341 | action = &authedAction{ |
| 342 | act: act, |
| 343 | credentials: *c.credentials, |
| 344 | } |
| 345 | } |
| 346 | c.RUnlock() |
| 347 | |
| 348 | if leps == 0 { |
| 349 | return nil, nil, ErrNoEndpoints |
| 350 | } |
| 351 | |
| 352 | if leps != n { |
| 353 | return nil, nil, errors.New("unable to pick endpoint: copy failed") |
| 354 | } |
| 355 | |
| 356 | var resp *http.Response |
| 357 | var body []byte |
| 358 | var err error |
| 359 | cerr := &ClusterError{} |
| 360 | isOneShot := ctx.Value(&oneShotCtxValue) != nil |
| 361 | |
| 362 | for i := pinned; i < leps+pinned; i++ { |
| 363 | k := i % leps |
| 364 | hc := c.clientFactory(eps[k]) |
| 365 | resp, body, err = hc.Do(ctx, action) |
| 366 | if err != nil { |
| 367 | cerr.Errors = append(cerr.Errors, err) |
| 368 | if err == ctx.Err() { |
| 369 | return nil, nil, ctx.Err() |
| 370 | } |
| 371 | if err == context.Canceled || err == context.DeadlineExceeded { |
| 372 | return nil, nil, err |
| 373 | } |
| 374 | } else if resp.StatusCode/100 == 5 { |
| 375 | switch resp.StatusCode { |
| 376 | case http.StatusInternalServerError, http.StatusServiceUnavailable: |
| 377 | // TODO: make sure this is a no leader response |
| 378 | cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String())) |
| 379 | default: |
| 380 | cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode))) |
| 381 | } |
| 382 | err = cerr.Errors[0] |
| 383 | } |
| 384 | if err != nil { |
| 385 | if !isOneShot { |
| 386 | continue |
| 387 | } |
| 388 | c.Lock() |
| 389 | c.pinned = (k + 1) % leps |
| 390 | c.Unlock() |
| 391 | return nil, nil, err |
| 392 | } |
| 393 | if k != pinned { |
| 394 | c.Lock() |
| 395 | c.pinned = k |
| 396 | c.Unlock() |
| 397 | } |
| 398 | return resp, body, nil |
| 399 | } |
| 400 | |
| 401 | return nil, nil, cerr |
| 402 | } |
| 403 | |
| 404 | func (c *httpClusterClient) Endpoints() []string { |
| 405 | c.RLock() |
| 406 | defer c.RUnlock() |
| 407 | |
| 408 | eps := make([]string, len(c.endpoints)) |
| 409 | for i, ep := range c.endpoints { |
| 410 | eps[i] = ep.String() |
| 411 | } |
| 412 | |
| 413 | return eps |
| 414 | } |
| 415 | |
| 416 | func (c *httpClusterClient) Sync(ctx context.Context) error { |
| 417 | mAPI := NewMembersAPI(c) |
| 418 | ms, err := mAPI.List(ctx) |
| 419 | if err != nil { |
| 420 | return err |
| 421 | } |
| 422 | |
| 423 | var eps []string |
| 424 | for _, m := range ms { |
| 425 | eps = append(eps, m.ClientURLs...) |
| 426 | } |
| 427 | |
| 428 | neps, err := c.parseEndpoints(eps) |
| 429 | if err != nil { |
| 430 | return err |
| 431 | } |
| 432 | |
| 433 | npin := 0 |
| 434 | |
| 435 | switch c.selectionMode { |
| 436 | case EndpointSelectionRandom: |
| 437 | c.RLock() |
| 438 | eq := endpointsEqual(c.endpoints, neps) |
| 439 | c.RUnlock() |
| 440 | |
| 441 | if eq { |
| 442 | return nil |
| 443 | } |
| 444 | // When items in the endpoint list changes, we choose a new pin |
| 445 | neps = shuffleEndpoints(c.rand, neps) |
| 446 | case EndpointSelectionPrioritizeLeader: |
| 447 | nle, err := c.getLeaderEndpoint(ctx, neps) |
| 448 | if err != nil { |
| 449 | return ErrNoLeaderEndpoint |
| 450 | } |
| 451 | |
| 452 | for i, n := range neps { |
| 453 | if n.String() == nle { |
| 454 | npin = i |
| 455 | break |
| 456 | } |
| 457 | } |
| 458 | default: |
| 459 | return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode) |
| 460 | } |
| 461 | |
| 462 | c.Lock() |
| 463 | defer c.Unlock() |
| 464 | c.endpoints = neps |
| 465 | c.pinned = npin |
| 466 | |
| 467 | return nil |
| 468 | } |
| 469 | |
| 470 | func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error { |
| 471 | ticker := time.NewTicker(interval) |
| 472 | defer ticker.Stop() |
| 473 | for { |
| 474 | err := c.Sync(ctx) |
| 475 | if err != nil { |
| 476 | return err |
| 477 | } |
| 478 | select { |
| 479 | case <-ctx.Done(): |
| 480 | return ctx.Err() |
| 481 | case <-ticker.C: |
| 482 | } |
| 483 | } |
| 484 | } |
| 485 | |
| 486 | func (c *httpClusterClient) GetVersion(ctx context.Context) (*version.Versions, error) { |
| 487 | act := &getAction{Prefix: "/version"} |
| 488 | |
| 489 | resp, body, err := c.Do(ctx, act) |
| 490 | if err != nil { |
| 491 | return nil, err |
| 492 | } |
| 493 | |
| 494 | switch resp.StatusCode { |
| 495 | case http.StatusOK: |
| 496 | if len(body) == 0 { |
| 497 | return nil, ErrEmptyBody |
| 498 | } |
| 499 | var vresp version.Versions |
| 500 | if err := json.Unmarshal(body, &vresp); err != nil { |
| 501 | return nil, ErrInvalidJSON |
| 502 | } |
| 503 | return &vresp, nil |
| 504 | default: |
| 505 | var etcdErr Error |
| 506 | if err := json.Unmarshal(body, &etcdErr); err != nil { |
| 507 | return nil, ErrInvalidJSON |
| 508 | } |
| 509 | return nil, etcdErr |
| 510 | } |
| 511 | } |
| 512 | |
| 513 | type roundTripResponse struct { |
| 514 | resp *http.Response |
| 515 | err error |
| 516 | } |
| 517 | |
| 518 | type simpleHTTPClient struct { |
| 519 | transport CancelableTransport |
| 520 | endpoint url.URL |
| 521 | headerTimeout time.Duration |
| 522 | } |
| 523 | |
| 524 | func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { |
| 525 | req := act.HTTPRequest(c.endpoint) |
| 526 | |
| 527 | if err := printcURL(req); err != nil { |
| 528 | return nil, nil, err |
| 529 | } |
| 530 | |
| 531 | isWait := false |
| 532 | if req != nil && req.URL != nil { |
| 533 | ws := req.URL.Query().Get("wait") |
| 534 | if len(ws) != 0 { |
| 535 | var err error |
| 536 | isWait, err = strconv.ParseBool(ws) |
| 537 | if err != nil { |
| 538 | return nil, nil, fmt.Errorf("wrong wait value %s (%v for %+v)", ws, err, req) |
| 539 | } |
| 540 | } |
| 541 | } |
| 542 | |
| 543 | var hctx context.Context |
| 544 | var hcancel context.CancelFunc |
| 545 | if !isWait && c.headerTimeout > 0 { |
| 546 | hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout) |
| 547 | } else { |
| 548 | hctx, hcancel = context.WithCancel(ctx) |
| 549 | } |
| 550 | defer hcancel() |
| 551 | |
| 552 | reqcancel := requestCanceler(c.transport, req) |
| 553 | |
| 554 | rtchan := make(chan roundTripResponse, 1) |
| 555 | go func() { |
| 556 | resp, err := c.transport.RoundTrip(req) |
| 557 | rtchan <- roundTripResponse{resp: resp, err: err} |
| 558 | close(rtchan) |
| 559 | }() |
| 560 | |
| 561 | var resp *http.Response |
| 562 | var err error |
| 563 | |
| 564 | select { |
| 565 | case rtresp := <-rtchan: |
| 566 | resp, err = rtresp.resp, rtresp.err |
| 567 | case <-hctx.Done(): |
| 568 | // cancel and wait for request to actually exit before continuing |
| 569 | reqcancel() |
| 570 | rtresp := <-rtchan |
| 571 | resp = rtresp.resp |
| 572 | switch { |
| 573 | case ctx.Err() != nil: |
| 574 | err = ctx.Err() |
| 575 | case hctx.Err() != nil: |
| 576 | err = fmt.Errorf("client: endpoint %s exceeded header timeout", c.endpoint.String()) |
| 577 | default: |
| 578 | panic("failed to get error from context") |
| 579 | } |
| 580 | } |
| 581 | |
| 582 | // always check for resp nil-ness to deal with possible |
| 583 | // race conditions between channels above |
| 584 | defer func() { |
| 585 | if resp != nil { |
| 586 | resp.Body.Close() |
| 587 | } |
| 588 | }() |
| 589 | |
| 590 | if err != nil { |
| 591 | return nil, nil, err |
| 592 | } |
| 593 | |
| 594 | var body []byte |
| 595 | done := make(chan struct{}) |
| 596 | go func() { |
| 597 | body, err = ioutil.ReadAll(resp.Body) |
| 598 | done <- struct{}{} |
| 599 | }() |
| 600 | |
| 601 | select { |
| 602 | case <-ctx.Done(): |
| 603 | resp.Body.Close() |
| 604 | <-done |
| 605 | return nil, nil, ctx.Err() |
| 606 | case <-done: |
| 607 | } |
| 608 | |
| 609 | return resp, body, err |
| 610 | } |
| 611 | |
| 612 | type authedAction struct { |
| 613 | act httpAction |
| 614 | credentials credentials |
| 615 | } |
| 616 | |
| 617 | func (a *authedAction) HTTPRequest(url url.URL) *http.Request { |
| 618 | r := a.act.HTTPRequest(url) |
| 619 | r.SetBasicAuth(a.credentials.username, a.credentials.password) |
| 620 | return r |
| 621 | } |
| 622 | |
| 623 | type redirectFollowingHTTPClient struct { |
| 624 | client httpClient |
| 625 | checkRedirect CheckRedirectFunc |
| 626 | } |
| 627 | |
| 628 | func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { |
| 629 | next := act |
| 630 | for i := 0; i < 100; i++ { |
| 631 | if i > 0 { |
| 632 | if err := r.checkRedirect(i); err != nil { |
| 633 | return nil, nil, err |
| 634 | } |
| 635 | } |
| 636 | resp, body, err := r.client.Do(ctx, next) |
| 637 | if err != nil { |
| 638 | return nil, nil, err |
| 639 | } |
| 640 | if resp.StatusCode/100 == 3 { |
| 641 | hdr := resp.Header.Get("Location") |
| 642 | if hdr == "" { |
| 643 | return nil, nil, fmt.Errorf("location header not set") |
| 644 | } |
| 645 | loc, err := url.Parse(hdr) |
| 646 | if err != nil { |
| 647 | return nil, nil, fmt.Errorf("location header not valid URL: %s", hdr) |
| 648 | } |
| 649 | next = &redirectedHTTPAction{ |
| 650 | action: act, |
| 651 | location: *loc, |
| 652 | } |
| 653 | continue |
| 654 | } |
| 655 | return resp, body, nil |
| 656 | } |
| 657 | |
| 658 | return nil, nil, errTooManyRedirectChecks |
| 659 | } |
| 660 | |
| 661 | type redirectedHTTPAction struct { |
| 662 | action httpAction |
| 663 | location url.URL |
| 664 | } |
| 665 | |
| 666 | func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request { |
| 667 | orig := r.action.HTTPRequest(ep) |
| 668 | orig.URL = &r.location |
| 669 | return orig |
| 670 | } |
| 671 | |
| 672 | func shuffleEndpoints(r *rand.Rand, eps []url.URL) []url.URL { |
| 673 | // copied from Go 1.9<= rand.Rand.Perm |
| 674 | n := len(eps) |
| 675 | p := make([]int, n) |
| 676 | for i := 0; i < n; i++ { |
| 677 | j := r.Intn(i + 1) |
| 678 | p[i] = p[j] |
| 679 | p[j] = i |
| 680 | } |
| 681 | neps := make([]url.URL, n) |
| 682 | for i, k := range p { |
| 683 | neps[i] = eps[k] |
| 684 | } |
| 685 | return neps |
| 686 | } |
| 687 | |
| 688 | func endpointsEqual(left, right []url.URL) bool { |
| 689 | if len(left) != len(right) { |
| 690 | return false |
| 691 | } |
| 692 | |
| 693 | sLeft := make([]string, len(left)) |
| 694 | sRight := make([]string, len(right)) |
| 695 | for i, l := range left { |
| 696 | sLeft[i] = l.String() |
| 697 | } |
| 698 | for i, r := range right { |
| 699 | sRight[i] = r.String() |
| 700 | } |
| 701 | |
| 702 | sort.Strings(sLeft) |
| 703 | sort.Strings(sRight) |
| 704 | for i := range sLeft { |
| 705 | if sLeft[i] != sRight[i] { |
| 706 | return false |
| 707 | } |
| 708 | } |
| 709 | return true |
| 710 | } |