blob: 0570615fcc3c04912c1db7ea749791fe2a99bbe2 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package rest
18
19import (
20 "bytes"
21 "context"
22 "encoding/hex"
23 "fmt"
24 "io"
25 "io/ioutil"
26 "mime"
27 "net/http"
28 "net/url"
29 "path"
30 "reflect"
31 "strconv"
32 "strings"
33 "time"
34
35 "golang.org/x/net/http2"
36 "k8s.io/apimachinery/pkg/api/errors"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/runtime"
39 "k8s.io/apimachinery/pkg/runtime/schema"
40 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
41 "k8s.io/apimachinery/pkg/util/net"
42 "k8s.io/apimachinery/pkg/watch"
43 restclientwatch "k8s.io/client-go/rest/watch"
44 "k8s.io/client-go/tools/metrics"
45 "k8s.io/client-go/util/flowcontrol"
46 "k8s.io/klog"
47)
48
49var (
50 // longThrottleLatency defines threshold for logging requests. All requests being
51 // throttle for more than longThrottleLatency will be logged.
52 longThrottleLatency = 50 * time.Millisecond
53)
54
55// HTTPClient is an interface for testing a request object.
56type HTTPClient interface {
57 Do(req *http.Request) (*http.Response, error)
58}
59
60// ResponseWrapper is an interface for getting a response.
61// The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
62type ResponseWrapper interface {
63 DoRaw() ([]byte, error)
64 Stream() (io.ReadCloser, error)
65}
66
67// RequestConstructionError is returned when there's an error assembling a request.
68type RequestConstructionError struct {
69 Err error
70}
71
72// Error returns a textual description of 'r'.
73func (r *RequestConstructionError) Error() string {
74 return fmt.Sprintf("request construction error: '%v'", r.Err)
75}
76
77// Request allows for building up a request to a server in a chained fashion.
78// Any errors are stored until the end of your call, so you only have to
79// check once.
80type Request struct {
81 // required
82 client HTTPClient
83 verb string
84
85 baseURL *url.URL
86 content ContentConfig
87 serializers Serializers
88
89 // generic components accessible via method setters
90 pathPrefix string
91 subpath string
92 params url.Values
93 headers http.Header
94
95 // structural elements of the request that are part of the Kubernetes API conventions
96 namespace string
97 namespaceSet bool
98 resource string
99 resourceName string
100 subresource string
101 timeout time.Duration
102
103 // output
104 err error
105 body io.Reader
106
107 // This is only used for per-request timeouts, deadlines, and cancellations.
108 ctx context.Context
109
110 backoffMgr BackoffManager
111 throttle flowcontrol.RateLimiter
112}
113
114// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
115func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
116 if backoff == nil {
117 klog.V(2).Infof("Not implementing request backoff strategy.")
118 backoff = &NoBackoff{}
119 }
120
121 pathPrefix := "/"
122 if baseURL != nil {
123 pathPrefix = path.Join(pathPrefix, baseURL.Path)
124 }
125 r := &Request{
126 client: client,
127 verb: verb,
128 baseURL: baseURL,
129 pathPrefix: path.Join(pathPrefix, versionedAPIPath),
130 content: content,
131 serializers: serializers,
132 backoffMgr: backoff,
133 throttle: throttle,
134 timeout: timeout,
135 }
136 switch {
137 case len(content.AcceptContentTypes) > 0:
138 r.SetHeader("Accept", content.AcceptContentTypes)
139 case len(content.ContentType) > 0:
140 r.SetHeader("Accept", content.ContentType+", */*")
141 }
142 return r
143}
144
145// Prefix adds segments to the relative beginning to the request path. These
146// items will be placed before the optional Namespace, Resource, or Name sections.
147// Setting AbsPath will clear any previously set Prefix segments
148func (r *Request) Prefix(segments ...string) *Request {
149 if r.err != nil {
150 return r
151 }
152 r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
153 return r
154}
155
156// Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
157// Namespace, Resource, or Name sections.
158func (r *Request) Suffix(segments ...string) *Request {
159 if r.err != nil {
160 return r
161 }
162 r.subpath = path.Join(r.subpath, path.Join(segments...))
163 return r
164}
165
166// Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
167func (r *Request) Resource(resource string) *Request {
168 if r.err != nil {
169 return r
170 }
171 if len(r.resource) != 0 {
172 r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
173 return r
174 }
175 if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
176 r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
177 return r
178 }
179 r.resource = resource
180 return r
181}
182
183// BackOff sets the request's backoff manager to the one specified,
184// or defaults to the stub implementation if nil is provided
185func (r *Request) BackOff(manager BackoffManager) *Request {
186 if manager == nil {
187 r.backoffMgr = &NoBackoff{}
188 return r
189 }
190
191 r.backoffMgr = manager
192 return r
193}
194
195// Throttle receives a rate-limiter and sets or replaces an existing request limiter
196func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
197 r.throttle = limiter
198 return r
199}
200
201// SubResource sets a sub-resource path which can be multiple segments after the resource
202// name but before the suffix.
203func (r *Request) SubResource(subresources ...string) *Request {
204 if r.err != nil {
205 return r
206 }
207 subresource := path.Join(subresources...)
208 if len(r.subresource) != 0 {
209 r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
210 return r
211 }
212 for _, s := range subresources {
213 if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
214 r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
215 return r
216 }
217 }
218 r.subresource = subresource
219 return r
220}
221
222// Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
223func (r *Request) Name(resourceName string) *Request {
224 if r.err != nil {
225 return r
226 }
227 if len(resourceName) == 0 {
228 r.err = fmt.Errorf("resource name may not be empty")
229 return r
230 }
231 if len(r.resourceName) != 0 {
232 r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
233 return r
234 }
235 if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
236 r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
237 return r
238 }
239 r.resourceName = resourceName
240 return r
241}
242
243// Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
244func (r *Request) Namespace(namespace string) *Request {
245 if r.err != nil {
246 return r
247 }
248 if r.namespaceSet {
249 r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
250 return r
251 }
252 if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
253 r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
254 return r
255 }
256 r.namespaceSet = true
257 r.namespace = namespace
258 return r
259}
260
261// NamespaceIfScoped is a convenience function to set a namespace if scoped is true
262func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
263 if scoped {
264 return r.Namespace(namespace)
265 }
266 return r
267}
268
269// AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
270// when a single segment is passed.
271func (r *Request) AbsPath(segments ...string) *Request {
272 if r.err != nil {
273 return r
274 }
275 r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
276 if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
277 // preserve any trailing slashes for legacy behavior
278 r.pathPrefix += "/"
279 }
280 return r
281}
282
283// RequestURI overwrites existing path and parameters with the value of the provided server relative
284// URI.
285func (r *Request) RequestURI(uri string) *Request {
286 if r.err != nil {
287 return r
288 }
289 locator, err := url.Parse(uri)
290 if err != nil {
291 r.err = err
292 return r
293 }
294 r.pathPrefix = locator.Path
295 if len(locator.Query()) > 0 {
296 if r.params == nil {
297 r.params = make(url.Values)
298 }
299 for k, v := range locator.Query() {
300 r.params[k] = v
301 }
302 }
303 return r
304}
305
306// Param creates a query parameter with the given string value.
307func (r *Request) Param(paramName, s string) *Request {
308 if r.err != nil {
309 return r
310 }
311 return r.setParam(paramName, s)
312}
313
314// VersionedParams will take the provided object, serialize it to a map[string][]string using the
315// implicit RESTClient API version and the default parameter codec, and then add those as parameters
316// to the request. Use this to provide versioned query parameters from client libraries.
317// VersionedParams will not write query parameters that have omitempty set and are empty. If a
318// parameter has already been set it is appended to (Params and VersionedParams are additive).
319func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
320 return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
321}
322
323func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
324 if r.err != nil {
325 return r
326 }
327 params, err := codec.EncodeParameters(obj, version)
328 if err != nil {
329 r.err = err
330 return r
331 }
332 for k, v := range params {
333 if r.params == nil {
334 r.params = make(url.Values)
335 }
336 r.params[k] = append(r.params[k], v...)
337 }
338 return r
339}
340
341func (r *Request) setParam(paramName, value string) *Request {
342 if r.params == nil {
343 r.params = make(url.Values)
344 }
345 r.params[paramName] = append(r.params[paramName], value)
346 return r
347}
348
349func (r *Request) SetHeader(key string, values ...string) *Request {
350 if r.headers == nil {
351 r.headers = http.Header{}
352 }
353 r.headers.Del(key)
354 for _, value := range values {
355 r.headers.Add(key, value)
356 }
357 return r
358}
359
360// Timeout makes the request use the given duration as an overall timeout for the
361// request. Additionally, if set passes the value as "timeout" parameter in URL.
362func (r *Request) Timeout(d time.Duration) *Request {
363 if r.err != nil {
364 return r
365 }
366 r.timeout = d
367 return r
368}
369
370// Body makes the request use obj as the body. Optional.
371// If obj is a string, try to read a file of that name.
372// If obj is a []byte, send it directly.
373// If obj is an io.Reader, use it directly.
374// If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
375// If obj is a runtime.Object and nil, do nothing.
376// Otherwise, set an error.
377func (r *Request) Body(obj interface{}) *Request {
378 if r.err != nil {
379 return r
380 }
381 switch t := obj.(type) {
382 case string:
383 data, err := ioutil.ReadFile(t)
384 if err != nil {
385 r.err = err
386 return r
387 }
388 glogBody("Request Body", data)
389 r.body = bytes.NewReader(data)
390 case []byte:
391 glogBody("Request Body", t)
392 r.body = bytes.NewReader(t)
393 case io.Reader:
394 r.body = t
395 case runtime.Object:
396 // callers may pass typed interface pointers, therefore we must check nil with reflection
397 if reflect.ValueOf(t).IsNil() {
398 return r
399 }
400 data, err := runtime.Encode(r.serializers.Encoder, t)
401 if err != nil {
402 r.err = err
403 return r
404 }
405 glogBody("Request Body", data)
406 r.body = bytes.NewReader(data)
407 r.SetHeader("Content-Type", r.content.ContentType)
408 default:
409 r.err = fmt.Errorf("unknown type used for body: %+v", obj)
410 }
411 return r
412}
413
414// Context adds a context to the request. Contexts are only used for
415// timeouts, deadlines, and cancellations.
416func (r *Request) Context(ctx context.Context) *Request {
417 r.ctx = ctx
418 return r
419}
420
421// URL returns the current working URL.
422func (r *Request) URL() *url.URL {
423 p := r.pathPrefix
424 if r.namespaceSet && len(r.namespace) > 0 {
425 p = path.Join(p, "namespaces", r.namespace)
426 }
427 if len(r.resource) != 0 {
428 p = path.Join(p, strings.ToLower(r.resource))
429 }
430 // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
431 if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
432 p = path.Join(p, r.resourceName, r.subresource, r.subpath)
433 }
434
435 finalURL := &url.URL{}
436 if r.baseURL != nil {
437 *finalURL = *r.baseURL
438 }
439 finalURL.Path = p
440
441 query := url.Values{}
442 for key, values := range r.params {
443 for _, value := range values {
444 query.Add(key, value)
445 }
446 }
447
448 // timeout is handled specially here.
449 if r.timeout != 0 {
450 query.Set("timeout", r.timeout.String())
451 }
452 finalURL.RawQuery = query.Encode()
453 return finalURL
454}
455
456// finalURLTemplate is similar to URL(), but will make all specific parameter values equal
457// - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
458// parameters will be reset. This creates a copy of the url so as not to change the
459// underlying object.
460func (r Request) finalURLTemplate() url.URL {
461 newParams := url.Values{}
462 v := []string{"{value}"}
463 for k := range r.params {
464 newParams[k] = v
465 }
466 r.params = newParams
467 url := r.URL()
468 segments := strings.Split(r.URL().Path, "/")
469 groupIndex := 0
470 index := 0
471 if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
472 groupIndex += len(strings.Split(r.baseURL.Path, "/"))
473 }
474 if groupIndex >= len(segments) {
475 return *url
476 }
477
478 const CoreGroupPrefix = "api"
479 const NamedGroupPrefix = "apis"
480 isCoreGroup := segments[groupIndex] == CoreGroupPrefix
481 isNamedGroup := segments[groupIndex] == NamedGroupPrefix
482 if isCoreGroup {
483 // checking the case of core group with /api/v1/... format
484 index = groupIndex + 2
485 } else if isNamedGroup {
486 // checking the case of named group with /apis/apps/v1/... format
487 index = groupIndex + 3
488 } else {
489 // this should not happen that the only two possibilities are /api... and /apis..., just want to put an
490 // outlet here in case more API groups are added in future if ever possible:
491 // https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
492 // if a wrong API groups name is encountered, return the {prefix} for url.Path
493 url.Path = "/{prefix}"
494 url.RawQuery = ""
495 return *url
496 }
497 //switch segLength := len(segments) - index; segLength {
498 switch {
499 // case len(segments) - index == 1:
500 // resource (with no name) do nothing
501 case len(segments)-index == 2:
502 // /$RESOURCE/$NAME: replace $NAME with {name}
503 segments[index+1] = "{name}"
504 case len(segments)-index == 3:
505 if segments[index+2] == "finalize" || segments[index+2] == "status" {
506 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
507 segments[index+1] = "{name}"
508 } else {
509 // /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
510 segments[index+1] = "{namespace}"
511 }
512 case len(segments)-index >= 4:
513 segments[index+1] = "{namespace}"
514 // /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
515 if segments[index+3] != "finalize" && segments[index+3] != "status" {
516 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
517 segments[index+3] = "{name}"
518 }
519 }
520 url.Path = path.Join(segments...)
521 return *url
522}
523
524func (r *Request) tryThrottle() {
525 now := time.Now()
526 if r.throttle != nil {
527 r.throttle.Accept()
528 }
529 if latency := time.Since(now); latency > longThrottleLatency {
530 klog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
531 }
532}
533
534// Watch attempts to begin watching the requested location.
535// Returns a watch.Interface, or an error.
536func (r *Request) Watch() (watch.Interface, error) {
537 return r.WatchWithSpecificDecoders(
538 func(body io.ReadCloser) streaming.Decoder {
539 framer := r.serializers.Framer.NewFrameReader(body)
540 return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
541 },
542 r.serializers.Decoder,
543 )
544}
545
546// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
547// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
548// Returns a watch.Interface, or an error.
549func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
550 // We specifically don't want to rate limit watches, so we
551 // don't use r.throttle here.
552 if r.err != nil {
553 return nil, r.err
554 }
555 if r.serializers.Framer == nil {
556 return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
557 }
558
559 url := r.URL().String()
560 req, err := http.NewRequest(r.verb, url, r.body)
561 if err != nil {
562 return nil, err
563 }
564 if r.ctx != nil {
565 req = req.WithContext(r.ctx)
566 }
567 req.Header = r.headers
568 client := r.client
569 if client == nil {
570 client = http.DefaultClient
571 }
572 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
573 resp, err := client.Do(req)
574 updateURLMetrics(r, resp, err)
575 if r.baseURL != nil {
576 if err != nil {
577 r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
578 } else {
579 r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
580 }
581 }
582 if err != nil {
583 // The watch stream mechanism handles many common partial data errors, so closed
584 // connections can be retried in many cases.
585 if net.IsProbableEOF(err) {
586 return watch.NewEmptyWatch(), nil
587 }
588 return nil, err
589 }
590 if resp.StatusCode != http.StatusOK {
591 defer resp.Body.Close()
592 if result := r.transformResponse(resp, req); result.err != nil {
593 return nil, result.err
594 }
David Bainbridge86971522019-09-26 22:09:39 +0000595 return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
Zack Williamse940c7a2019-08-21 14:25:39 -0700596 }
597 wrapperDecoder := wrapperDecoderFn(resp.Body)
David Bainbridge86971522019-09-26 22:09:39 +0000598 return watch.NewStreamWatcher(
599 restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
600 // use 500 to indicate that the cause of the error is unknown - other error codes
601 // are more specific to HTTP interactions, and set a reason
602 errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
603 ), nil
Zack Williamse940c7a2019-08-21 14:25:39 -0700604}
605
606// updateURLMetrics is a convenience function for pushing metrics.
607// It also handles corner cases for incomplete/invalid request data.
608func updateURLMetrics(req *Request, resp *http.Response, err error) {
609 url := "none"
610 if req.baseURL != nil {
611 url = req.baseURL.Host
612 }
613
614 // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
615 // system so we just report them as `<error>`.
616 if err != nil {
617 metrics.RequestResult.Increment("<error>", req.verb, url)
618 } else {
619 //Metrics for failure codes
620 metrics.RequestResult.Increment(strconv.Itoa(resp.StatusCode), req.verb, url)
621 }
622}
623
624// Stream formats and executes the request, and offers streaming of the response.
625// Returns io.ReadCloser which could be used for streaming of the response, or an error
626// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
627// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
628func (r *Request) Stream() (io.ReadCloser, error) {
629 if r.err != nil {
630 return nil, r.err
631 }
632
633 r.tryThrottle()
634
635 url := r.URL().String()
636 req, err := http.NewRequest(r.verb, url, nil)
637 if err != nil {
638 return nil, err
639 }
640 if r.ctx != nil {
641 req = req.WithContext(r.ctx)
642 }
643 req.Header = r.headers
644 client := r.client
645 if client == nil {
646 client = http.DefaultClient
647 }
648 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
649 resp, err := client.Do(req)
650 updateURLMetrics(r, resp, err)
651 if r.baseURL != nil {
652 if err != nil {
653 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
654 } else {
655 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
656 }
657 }
658 if err != nil {
659 return nil, err
660 }
661
662 switch {
663 case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
664 return resp.Body, nil
665
666 default:
667 // ensure we close the body before returning the error
668 defer resp.Body.Close()
669
670 result := r.transformResponse(resp, req)
671 err := result.Error()
672 if err == nil {
673 err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
674 }
675 return nil, err
676 }
677}
678
679// request connects to the server and invokes the provided function when a server response is
680// received. It handles retry behavior and up front validation of requests. It will invoke
681// fn at most once. It will return an error if a problem occurred prior to connecting to the
682// server - the provided function is responsible for handling server errors.
683func (r *Request) request(fn func(*http.Request, *http.Response)) error {
684 //Metrics for total request latency
685 start := time.Now()
686 defer func() {
687 metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
688 }()
689
690 if r.err != nil {
691 klog.V(4).Infof("Error in request: %v", r.err)
692 return r.err
693 }
694
695 // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
696 if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
697 return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
698 }
699 if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
700 return fmt.Errorf("an empty namespace may not be set during creation")
701 }
702
703 client := r.client
704 if client == nil {
705 client = http.DefaultClient
706 }
707
708 // Right now we make about ten retry attempts if we get a Retry-After response.
709 maxRetries := 10
710 retries := 0
711 for {
712 url := r.URL().String()
713 req, err := http.NewRequest(r.verb, url, r.body)
714 if err != nil {
715 return err
716 }
717 if r.timeout > 0 {
718 if r.ctx == nil {
719 r.ctx = context.Background()
720 }
721 var cancelFn context.CancelFunc
722 r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
723 defer cancelFn()
724 }
725 if r.ctx != nil {
726 req = req.WithContext(r.ctx)
727 }
728 req.Header = r.headers
729
730 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
731 if retries > 0 {
732 // We are retrying the request that we already send to apiserver
733 // at least once before.
734 // This request should also be throttled with the client-internal throttler.
735 r.tryThrottle()
736 }
737 resp, err := client.Do(req)
738 updateURLMetrics(r, resp, err)
739 if err != nil {
740 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
741 } else {
742 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
743 }
744 if err != nil {
745 // "Connection reset by peer" is usually a transient error.
746 // Thus in case of "GET" operations, we simply retry it.
747 // We are not automatically retrying "write" operations, as
748 // they are not idempotent.
749 if !net.IsConnectionReset(err) || r.verb != "GET" {
750 return err
751 }
752 // For the purpose of retry, we set the artificial "retry-after" response.
753 // TODO: Should we clean the original response if it exists?
754 resp = &http.Response{
755 StatusCode: http.StatusInternalServerError,
756 Header: http.Header{"Retry-After": []string{"1"}},
757 Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
758 }
759 }
760
761 done := func() bool {
762 // Ensure the response body is fully read and closed
763 // before we reconnect, so that we reuse the same TCP
764 // connection.
765 defer func() {
766 const maxBodySlurpSize = 2 << 10
767 if resp.ContentLength <= maxBodySlurpSize {
768 io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
769 }
770 resp.Body.Close()
771 }()
772
773 retries++
774 if seconds, wait := checkWait(resp); wait && retries < maxRetries {
775 if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
776 _, err := seeker.Seek(0, 0)
777 if err != nil {
778 klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
779 fn(req, resp)
780 return true
781 }
782 }
783
784 klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
785 r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
786 return false
787 }
788 fn(req, resp)
789 return true
790 }()
791 if done {
792 return nil
793 }
794 }
795}
796
797// Do formats and executes the request. Returns a Result object for easy response
798// processing.
799//
800// Error type:
801// * If the request can't be constructed, or an error happened earlier while building its
802// arguments: *RequestConstructionError
803// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
804// * http.Client.Do errors are returned directly.
805func (r *Request) Do() Result {
806 r.tryThrottle()
807
808 var result Result
809 err := r.request(func(req *http.Request, resp *http.Response) {
810 result = r.transformResponse(resp, req)
811 })
812 if err != nil {
813 return Result{err: err}
814 }
815 return result
816}
817
818// DoRaw executes the request but does not process the response body.
819func (r *Request) DoRaw() ([]byte, error) {
820 r.tryThrottle()
821
822 var result Result
823 err := r.request(func(req *http.Request, resp *http.Response) {
824 result.body, result.err = ioutil.ReadAll(resp.Body)
825 glogBody("Response Body", result.body)
826 if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
827 result.err = r.transformUnstructuredResponseError(resp, req, result.body)
828 }
829 })
830 if err != nil {
831 return nil, err
832 }
833 return result.body, result.err
834}
835
836// transformResponse converts an API response into a structured API object
837func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
838 var body []byte
839 if resp.Body != nil {
840 data, err := ioutil.ReadAll(resp.Body)
841 switch err.(type) {
842 case nil:
843 body = data
844 case http2.StreamError:
845 // This is trying to catch the scenario that the server may close the connection when sending the
846 // response body. This can be caused by server timeout due to a slow network connection.
847 // TODO: Add test for this. Steps may be:
848 // 1. client-go (or kubectl) sends a GET request.
849 // 2. Apiserver sends back the headers and then part of the body
850 // 3. Apiserver closes connection.
851 // 4. client-go should catch this and return an error.
852 klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
David Bainbridge86971522019-09-26 22:09:39 +0000853 streamErr := fmt.Errorf("Stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
Zack Williamse940c7a2019-08-21 14:25:39 -0700854 return Result{
855 err: streamErr,
856 }
857 default:
David Bainbridge86971522019-09-26 22:09:39 +0000858 klog.Errorf("Unexpected error when reading response body: %v", err)
859 unexpectedErr := fmt.Errorf("Unexpected error when reading response body. Please retry. Original error: %v", err)
Zack Williamse940c7a2019-08-21 14:25:39 -0700860 return Result{
861 err: unexpectedErr,
862 }
863 }
864 }
865
866 glogBody("Response Body", body)
867
868 // verify the content type is accurate
869 contentType := resp.Header.Get("Content-Type")
870 decoder := r.serializers.Decoder
871 if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
872 mediaType, params, err := mime.ParseMediaType(contentType)
873 if err != nil {
874 return Result{err: errors.NewInternalError(err)}
875 }
876 decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
877 if err != nil {
878 // if we fail to negotiate a decoder, treat this as an unstructured error
879 switch {
880 case resp.StatusCode == http.StatusSwitchingProtocols:
881 // no-op, we've been upgraded
882 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
883 return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
884 }
885 return Result{
886 body: body,
887 contentType: contentType,
888 statusCode: resp.StatusCode,
889 }
890 }
891 }
892
893 switch {
894 case resp.StatusCode == http.StatusSwitchingProtocols:
895 // no-op, we've been upgraded
896 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
897 // calculate an unstructured error from the response which the Result object may use if the caller
898 // did not return a structured error.
899 retryAfter, _ := retryAfterSeconds(resp)
900 err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
901 return Result{
902 body: body,
903 contentType: contentType,
904 statusCode: resp.StatusCode,
905 decoder: decoder,
906 err: err,
907 }
908 }
909
910 return Result{
911 body: body,
912 contentType: contentType,
913 statusCode: resp.StatusCode,
914 decoder: decoder,
915 }
916}
917
918// truncateBody decides if the body should be truncated, based on the glog Verbosity.
919func truncateBody(body string) string {
920 max := 0
921 switch {
922 case bool(klog.V(10)):
923 return body
924 case bool(klog.V(9)):
925 max = 10240
926 case bool(klog.V(8)):
927 max = 1024
928 }
929
930 if len(body) <= max {
931 return body
932 }
933
934 return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
935}
936
937// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
938// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
939// whether the body is printable.
940func glogBody(prefix string, body []byte) {
941 if klog.V(8) {
942 if bytes.IndexFunc(body, func(r rune) bool {
943 return r < 0x0a
944 }) != -1 {
945 klog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
946 } else {
947 klog.Infof("%s: %s", prefix, truncateBody(string(body)))
948 }
949 }
950}
951
952// maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
953const maxUnstructuredResponseTextBytes = 2048
954
955// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
956// It is expected to transform any response that is not recognizable as a clear server sent error from the
957// K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
958// introduce a level of uncertainty to the responses returned by servers that in common use result in
959// unexpected responses. The rough structure is:
960//
961// 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
962// - this is the happy path
963// - when you get this output, trust what the server sends
964// 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
965// generate a reasonable facsimile of the original failure.
966// - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
967// 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
968// 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
969// initial contact, the presence of mismatched body contents from posted content types
970// - Give these a separate distinct error type and capture as much as possible of the original message
971//
972// TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
973func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
974 if body == nil && resp.Body != nil {
975 if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
976 body = data
977 }
978 }
979 retryAfter, _ := retryAfterSeconds(resp)
980 return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
981}
982
983// newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
984func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
985 // cap the amount of output we create
986 if len(body) > maxUnstructuredResponseTextBytes {
987 body = body[:maxUnstructuredResponseTextBytes]
988 }
989
990 message := "unknown"
991 if isTextResponse {
992 message = strings.TrimSpace(string(body))
993 }
994 var groupResource schema.GroupResource
995 if len(r.resource) > 0 {
996 groupResource.Group = r.content.GroupVersion.Group
997 groupResource.Resource = r.resource
998 }
999 return errors.NewGenericServerResponse(
1000 statusCode,
1001 method,
1002 groupResource,
1003 r.resourceName,
1004 message,
1005 retryAfter,
1006 true,
1007 )
1008}
1009
1010// isTextResponse returns true if the response appears to be a textual media type.
1011func isTextResponse(resp *http.Response) bool {
1012 contentType := resp.Header.Get("Content-Type")
1013 if len(contentType) == 0 {
1014 return true
1015 }
1016 media, _, err := mime.ParseMediaType(contentType)
1017 if err != nil {
1018 return false
1019 }
1020 return strings.HasPrefix(media, "text/")
1021}
1022
1023// checkWait returns true along with a number of seconds if the server instructed us to wait
1024// before retrying.
1025func checkWait(resp *http.Response) (int, bool) {
1026 switch r := resp.StatusCode; {
1027 // any 500 error code and 429 can trigger a wait
1028 case r == http.StatusTooManyRequests, r >= 500:
1029 default:
1030 return 0, false
1031 }
1032 i, ok := retryAfterSeconds(resp)
1033 return i, ok
1034}
1035
1036// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
1037// the header was missing or not a valid number.
1038func retryAfterSeconds(resp *http.Response) (int, bool) {
1039 if h := resp.Header.Get("Retry-After"); len(h) > 0 {
1040 if i, err := strconv.Atoi(h); err == nil {
1041 return i, true
1042 }
1043 }
1044 return 0, false
1045}
1046
1047// Result contains the result of calling Request.Do().
1048type Result struct {
1049 body []byte
1050 contentType string
1051 err error
1052 statusCode int
1053
1054 decoder runtime.Decoder
1055}
1056
1057// Raw returns the raw result.
1058func (r Result) Raw() ([]byte, error) {
1059 return r.body, r.err
1060}
1061
1062// Get returns the result as an object, which means it passes through the decoder.
1063// If the returned object is of type Status and has .Status != StatusSuccess, the
1064// additional information in Status will be used to enrich the error.
1065func (r Result) Get() (runtime.Object, error) {
1066 if r.err != nil {
1067 // Check whether the result has a Status object in the body and prefer that.
1068 return nil, r.Error()
1069 }
1070 if r.decoder == nil {
1071 return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1072 }
1073
1074 // decode, but if the result is Status return that as an error instead.
1075 out, _, err := r.decoder.Decode(r.body, nil, nil)
1076 if err != nil {
1077 return nil, err
1078 }
1079 switch t := out.(type) {
1080 case *metav1.Status:
1081 // any status besides StatusSuccess is considered an error.
1082 if t.Status != metav1.StatusSuccess {
1083 return nil, errors.FromObject(t)
1084 }
1085 }
1086 return out, nil
1087}
1088
1089// StatusCode returns the HTTP status code of the request. (Only valid if no
1090// error was returned.)
1091func (r Result) StatusCode(statusCode *int) Result {
1092 *statusCode = r.statusCode
1093 return r
1094}
1095
1096// Into stores the result into obj, if possible. If obj is nil it is ignored.
1097// If the returned object is of type Status and has .Status != StatusSuccess, the
1098// additional information in Status will be used to enrich the error.
1099func (r Result) Into(obj runtime.Object) error {
1100 if r.err != nil {
1101 // Check whether the result has a Status object in the body and prefer that.
1102 return r.Error()
1103 }
1104 if r.decoder == nil {
1105 return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1106 }
1107 if len(r.body) == 0 {
1108 return fmt.Errorf("0-length response with status code: %d and content type: %s",
1109 r.statusCode, r.contentType)
1110 }
1111
1112 out, _, err := r.decoder.Decode(r.body, nil, obj)
1113 if err != nil || out == obj {
1114 return err
1115 }
1116 // if a different object is returned, see if it is Status and avoid double decoding
1117 // the object.
1118 switch t := out.(type) {
1119 case *metav1.Status:
1120 // any status besides StatusSuccess is considered an error.
1121 if t.Status != metav1.StatusSuccess {
1122 return errors.FromObject(t)
1123 }
1124 }
1125 return nil
1126}
1127
1128// WasCreated updates the provided bool pointer to whether the server returned
1129// 201 created or a different response.
1130func (r Result) WasCreated(wasCreated *bool) Result {
1131 *wasCreated = r.statusCode == http.StatusCreated
1132 return r
1133}
1134
1135// Error returns the error executing the request, nil if no error occurred.
1136// If the returned object is of type Status and has Status != StatusSuccess, the
1137// additional information in Status will be used to enrich the error.
1138// See the Request.Do() comment for what errors you might get.
1139func (r Result) Error() error {
1140 // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
1141 // a Status object.
1142 if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
1143 return r.err
1144 }
1145
1146 // attempt to convert the body into a Status object
1147 // to be backwards compatible with old servers that do not return a version, default to "v1"
1148 out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
1149 if err != nil {
1150 klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
1151 return r.err
1152 }
1153 switch t := out.(type) {
1154 case *metav1.Status:
1155 // because we default the kind, we *must* check for StatusFailure
1156 if t.Status == metav1.StatusFailure {
1157 return errors.FromObject(t)
1158 }
1159 }
1160 return r.err
1161}
1162
1163// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
1164var NameMayNotBe = []string{".", ".."}
1165
1166// NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
1167var NameMayNotContain = []string{"/", "%"}
1168
1169// IsValidPathSegmentName validates the name can be safely encoded as a path segment
1170func IsValidPathSegmentName(name string) []string {
1171 for _, illegalName := range NameMayNotBe {
1172 if name == illegalName {
1173 return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
1174 }
1175 }
1176
1177 var errors []string
1178 for _, illegalContent := range NameMayNotContain {
1179 if strings.Contains(name, illegalContent) {
1180 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1181 }
1182 }
1183
1184 return errors
1185}
1186
1187// IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
1188// It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
1189func IsValidPathSegmentPrefix(name string) []string {
1190 var errors []string
1191 for _, illegalContent := range NameMayNotContain {
1192 if strings.Contains(name, illegalContent) {
1193 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1194 }
1195 }
1196
1197 return errors
1198}
1199
1200// ValidatePathSegmentName validates the name can be safely encoded as a path segment
1201func ValidatePathSegmentName(name string, prefix bool) []string {
1202 if prefix {
1203 return IsValidPathSegmentPrefix(name)
1204 }
1205 return IsValidPathSegmentName(name)
1206}