blob: 64901fba20d88b42d0c7cbf43eb7545e6e88579f [file] [log] [blame]
sslobodrd046be82019-01-16 10:02:22 -05001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package rest
18
19import (
20 "bytes"
21 "context"
22 "encoding/hex"
23 "fmt"
24 "io"
25 "io/ioutil"
26 "mime"
27 "net/http"
28 "net/url"
29 "path"
30 "reflect"
31 "strconv"
32 "strings"
33 "time"
34
35 "golang.org/x/net/http2"
36 "k8s.io/apimachinery/pkg/api/errors"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/runtime"
39 "k8s.io/apimachinery/pkg/runtime/schema"
40 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
41 "k8s.io/apimachinery/pkg/util/net"
42 "k8s.io/apimachinery/pkg/watch"
43 restclientwatch "k8s.io/client-go/rest/watch"
44 "k8s.io/client-go/tools/metrics"
45 "k8s.io/client-go/util/flowcontrol"
46 "k8s.io/klog"
47)
48
49var (
50 // longThrottleLatency defines threshold for logging requests. All requests being
51 // throttle for more than longThrottleLatency will be logged.
52 longThrottleLatency = 50 * time.Millisecond
53)
54
55// HTTPClient is an interface for testing a request object.
56type HTTPClient interface {
57 Do(req *http.Request) (*http.Response, error)
58}
59
60// ResponseWrapper is an interface for getting a response.
61// The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
62type ResponseWrapper interface {
63 DoRaw() ([]byte, error)
64 Stream() (io.ReadCloser, error)
65}
66
67// RequestConstructionError is returned when there's an error assembling a request.
68type RequestConstructionError struct {
69 Err error
70}
71
72// Error returns a textual description of 'r'.
73func (r *RequestConstructionError) Error() string {
74 return fmt.Sprintf("request construction error: '%v'", r.Err)
75}
76
77// Request allows for building up a request to a server in a chained fashion.
78// Any errors are stored until the end of your call, so you only have to
79// check once.
80type Request struct {
81 // required
82 client HTTPClient
83 verb string
84
85 baseURL *url.URL
86 content ContentConfig
87 serializers Serializers
88
89 // generic components accessible via method setters
90 pathPrefix string
91 subpath string
92 params url.Values
93 headers http.Header
94
95 // structural elements of the request that are part of the Kubernetes API conventions
96 namespace string
97 namespaceSet bool
98 resource string
99 resourceName string
100 subresource string
101 timeout time.Duration
102
103 // output
104 err error
105 body io.Reader
106
107 // This is only used for per-request timeouts, deadlines, and cancellations.
108 ctx context.Context
109
110 backoffMgr BackoffManager
111 throttle flowcontrol.RateLimiter
112}
113
114// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
115func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {
116 if backoff == nil {
117 klog.V(2).Infof("Not implementing request backoff strategy.")
118 backoff = &NoBackoff{}
119 }
120
121 pathPrefix := "/"
122 if baseURL != nil {
123 pathPrefix = path.Join(pathPrefix, baseURL.Path)
124 }
125 r := &Request{
126 client: client,
127 verb: verb,
128 baseURL: baseURL,
129 pathPrefix: path.Join(pathPrefix, versionedAPIPath),
130 content: content,
131 serializers: serializers,
132 backoffMgr: backoff,
133 throttle: throttle,
134 timeout: timeout,
135 }
136 switch {
137 case len(content.AcceptContentTypes) > 0:
138 r.SetHeader("Accept", content.AcceptContentTypes)
139 case len(content.ContentType) > 0:
140 r.SetHeader("Accept", content.ContentType+", */*")
141 }
142 return r
143}
144
145// Prefix adds segments to the relative beginning to the request path. These
146// items will be placed before the optional Namespace, Resource, or Name sections.
147// Setting AbsPath will clear any previously set Prefix segments
148func (r *Request) Prefix(segments ...string) *Request {
149 if r.err != nil {
150 return r
151 }
152 r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
153 return r
154}
155
156// Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
157// Namespace, Resource, or Name sections.
158func (r *Request) Suffix(segments ...string) *Request {
159 if r.err != nil {
160 return r
161 }
162 r.subpath = path.Join(r.subpath, path.Join(segments...))
163 return r
164}
165
166// Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
167func (r *Request) Resource(resource string) *Request {
168 if r.err != nil {
169 return r
170 }
171 if len(r.resource) != 0 {
172 r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
173 return r
174 }
175 if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
176 r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
177 return r
178 }
179 r.resource = resource
180 return r
181}
182
183// BackOff sets the request's backoff manager to the one specified,
184// or defaults to the stub implementation if nil is provided
185func (r *Request) BackOff(manager BackoffManager) *Request {
186 if manager == nil {
187 r.backoffMgr = &NoBackoff{}
188 return r
189 }
190
191 r.backoffMgr = manager
192 return r
193}
194
195// Throttle receives a rate-limiter and sets or replaces an existing request limiter
196func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
197 r.throttle = limiter
198 return r
199}
200
201// SubResource sets a sub-resource path which can be multiple segments after the resource
202// name but before the suffix.
203func (r *Request) SubResource(subresources ...string) *Request {
204 if r.err != nil {
205 return r
206 }
207 subresource := path.Join(subresources...)
208 if len(r.subresource) != 0 {
209 r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
210 return r
211 }
212 for _, s := range subresources {
213 if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
214 r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
215 return r
216 }
217 }
218 r.subresource = subresource
219 return r
220}
221
222// Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
223func (r *Request) Name(resourceName string) *Request {
224 if r.err != nil {
225 return r
226 }
227 if len(resourceName) == 0 {
228 r.err = fmt.Errorf("resource name may not be empty")
229 return r
230 }
231 if len(r.resourceName) != 0 {
232 r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
233 return r
234 }
235 if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
236 r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
237 return r
238 }
239 r.resourceName = resourceName
240 return r
241}
242
243// Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
244func (r *Request) Namespace(namespace string) *Request {
245 if r.err != nil {
246 return r
247 }
248 if r.namespaceSet {
249 r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
250 return r
251 }
252 if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
253 r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
254 return r
255 }
256 r.namespaceSet = true
257 r.namespace = namespace
258 return r
259}
260
261// NamespaceIfScoped is a convenience function to set a namespace if scoped is true
262func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
263 if scoped {
264 return r.Namespace(namespace)
265 }
266 return r
267}
268
269// AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
270// when a single segment is passed.
271func (r *Request) AbsPath(segments ...string) *Request {
272 if r.err != nil {
273 return r
274 }
275 r.pathPrefix = path.Join(r.baseURL.Path, path.Join(segments...))
276 if len(segments) == 1 && (len(r.baseURL.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
277 // preserve any trailing slashes for legacy behavior
278 r.pathPrefix += "/"
279 }
280 return r
281}
282
283// RequestURI overwrites existing path and parameters with the value of the provided server relative
284// URI.
285func (r *Request) RequestURI(uri string) *Request {
286 if r.err != nil {
287 return r
288 }
289 locator, err := url.Parse(uri)
290 if err != nil {
291 r.err = err
292 return r
293 }
294 r.pathPrefix = locator.Path
295 if len(locator.Query()) > 0 {
296 if r.params == nil {
297 r.params = make(url.Values)
298 }
299 for k, v := range locator.Query() {
300 r.params[k] = v
301 }
302 }
303 return r
304}
305
306// Param creates a query parameter with the given string value.
307func (r *Request) Param(paramName, s string) *Request {
308 if r.err != nil {
309 return r
310 }
311 return r.setParam(paramName, s)
312}
313
314// VersionedParams will take the provided object, serialize it to a map[string][]string using the
315// implicit RESTClient API version and the default parameter codec, and then add those as parameters
316// to the request. Use this to provide versioned query parameters from client libraries.
317// VersionedParams will not write query parameters that have omitempty set and are empty. If a
318// parameter has already been set it is appended to (Params and VersionedParams are additive).
319func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
320 return r.SpecificallyVersionedParams(obj, codec, *r.content.GroupVersion)
321}
322
323func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
324 if r.err != nil {
325 return r
326 }
327 params, err := codec.EncodeParameters(obj, version)
328 if err != nil {
329 r.err = err
330 return r
331 }
332 for k, v := range params {
333 if r.params == nil {
334 r.params = make(url.Values)
335 }
336 r.params[k] = append(r.params[k], v...)
337 }
338 return r
339}
340
341func (r *Request) setParam(paramName, value string) *Request {
342 if r.params == nil {
343 r.params = make(url.Values)
344 }
345 r.params[paramName] = append(r.params[paramName], value)
346 return r
347}
348
349func (r *Request) SetHeader(key string, values ...string) *Request {
350 if r.headers == nil {
351 r.headers = http.Header{}
352 }
353 r.headers.Del(key)
354 for _, value := range values {
355 r.headers.Add(key, value)
356 }
357 return r
358}
359
360// Timeout makes the request use the given duration as an overall timeout for the
361// request. Additionally, if set passes the value as "timeout" parameter in URL.
362func (r *Request) Timeout(d time.Duration) *Request {
363 if r.err != nil {
364 return r
365 }
366 r.timeout = d
367 return r
368}
369
370// Body makes the request use obj as the body. Optional.
371// If obj is a string, try to read a file of that name.
372// If obj is a []byte, send it directly.
373// If obj is an io.Reader, use it directly.
374// If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
375// If obj is a runtime.Object and nil, do nothing.
376// Otherwise, set an error.
377func (r *Request) Body(obj interface{}) *Request {
378 if r.err != nil {
379 return r
380 }
381 switch t := obj.(type) {
382 case string:
383 data, err := ioutil.ReadFile(t)
384 if err != nil {
385 r.err = err
386 return r
387 }
388 glogBody("Request Body", data)
389 r.body = bytes.NewReader(data)
390 case []byte:
391 glogBody("Request Body", t)
392 r.body = bytes.NewReader(t)
393 case io.Reader:
394 r.body = t
395 case runtime.Object:
396 // callers may pass typed interface pointers, therefore we must check nil with reflection
397 if reflect.ValueOf(t).IsNil() {
398 return r
399 }
400 data, err := runtime.Encode(r.serializers.Encoder, t)
401 if err != nil {
402 r.err = err
403 return r
404 }
405 glogBody("Request Body", data)
406 r.body = bytes.NewReader(data)
407 r.SetHeader("Content-Type", r.content.ContentType)
408 default:
409 r.err = fmt.Errorf("unknown type used for body: %+v", obj)
410 }
411 return r
412}
413
414// Context adds a context to the request. Contexts are only used for
415// timeouts, deadlines, and cancellations.
416func (r *Request) Context(ctx context.Context) *Request {
417 r.ctx = ctx
418 return r
419}
420
421// URL returns the current working URL.
422func (r *Request) URL() *url.URL {
423 p := r.pathPrefix
424 if r.namespaceSet && len(r.namespace) > 0 {
425 p = path.Join(p, "namespaces", r.namespace)
426 }
427 if len(r.resource) != 0 {
428 p = path.Join(p, strings.ToLower(r.resource))
429 }
430 // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
431 if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
432 p = path.Join(p, r.resourceName, r.subresource, r.subpath)
433 }
434
435 finalURL := &url.URL{}
436 if r.baseURL != nil {
437 *finalURL = *r.baseURL
438 }
439 finalURL.Path = p
440
441 query := url.Values{}
442 for key, values := range r.params {
443 for _, value := range values {
444 query.Add(key, value)
445 }
446 }
447
448 // timeout is handled specially here.
449 if r.timeout != 0 {
450 query.Set("timeout", r.timeout.String())
451 }
452 finalURL.RawQuery = query.Encode()
453 return finalURL
454}
455
456// finalURLTemplate is similar to URL(), but will make all specific parameter values equal
457// - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
458// parameters will be reset. This creates a copy of the url so as not to change the
459// underlying object.
460func (r Request) finalURLTemplate() url.URL {
461 newParams := url.Values{}
462 v := []string{"{value}"}
463 for k := range r.params {
464 newParams[k] = v
465 }
466 r.params = newParams
467 url := r.URL()
468 segments := strings.Split(r.URL().Path, "/")
469 groupIndex := 0
470 index := 0
471 if r.URL() != nil && r.baseURL != nil && strings.Contains(r.URL().Path, r.baseURL.Path) {
472 groupIndex += len(strings.Split(r.baseURL.Path, "/"))
473 }
474 if groupIndex >= len(segments) {
475 return *url
476 }
477
478 const CoreGroupPrefix = "api"
479 const NamedGroupPrefix = "apis"
480 isCoreGroup := segments[groupIndex] == CoreGroupPrefix
481 isNamedGroup := segments[groupIndex] == NamedGroupPrefix
482 if isCoreGroup {
483 // checking the case of core group with /api/v1/... format
484 index = groupIndex + 2
485 } else if isNamedGroup {
486 // checking the case of named group with /apis/apps/v1/... format
487 index = groupIndex + 3
488 } else {
489 // this should not happen that the only two possibilities are /api... and /apis..., just want to put an
490 // outlet here in case more API groups are added in future if ever possible:
491 // https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
492 // if a wrong API groups name is encountered, return the {prefix} for url.Path
493 url.Path = "/{prefix}"
494 url.RawQuery = ""
495 return *url
496 }
497 //switch segLength := len(segments) - index; segLength {
498 switch {
499 // case len(segments) - index == 1:
500 // resource (with no name) do nothing
501 case len(segments)-index == 2:
502 // /$RESOURCE/$NAME: replace $NAME with {name}
503 segments[index+1] = "{name}"
504 case len(segments)-index == 3:
505 if segments[index+2] == "finalize" || segments[index+2] == "status" {
506 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
507 segments[index+1] = "{name}"
508 } else {
509 // /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
510 segments[index+1] = "{namespace}"
511 }
512 case len(segments)-index >= 4:
513 segments[index+1] = "{namespace}"
514 // /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
515 if segments[index+3] != "finalize" && segments[index+3] != "status" {
516 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
517 segments[index+3] = "{name}"
518 }
519 }
520 url.Path = path.Join(segments...)
521 return *url
522}
523
524func (r *Request) tryThrottle() {
525 now := time.Now()
526 if r.throttle != nil {
527 r.throttle.Accept()
528 }
529 if latency := time.Since(now); latency > longThrottleLatency {
530 klog.V(4).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
531 }
532}
533
534// Watch attempts to begin watching the requested location.
535// Returns a watch.Interface, or an error.
536func (r *Request) Watch() (watch.Interface, error) {
537 return r.WatchWithSpecificDecoders(
538 func(body io.ReadCloser) streaming.Decoder {
539 framer := r.serializers.Framer.NewFrameReader(body)
540 return streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
541 },
542 r.serializers.Decoder,
543 )
544}
545
546// WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder.
547// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
548// Returns a watch.Interface, or an error.
549func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
550 // We specifically don't want to rate limit watches, so we
551 // don't use r.throttle here.
552 if r.err != nil {
553 return nil, r.err
554 }
555 if r.serializers.Framer == nil {
556 return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
557 }
558
559 url := r.URL().String()
560 req, err := http.NewRequest(r.verb, url, r.body)
561 if err != nil {
562 return nil, err
563 }
564 if r.ctx != nil {
565 req = req.WithContext(r.ctx)
566 }
567 req.Header = r.headers
568 client := r.client
569 if client == nil {
570 client = http.DefaultClient
571 }
572 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
573 resp, err := client.Do(req)
574 updateURLMetrics(r, resp, err)
575 if r.baseURL != nil {
576 if err != nil {
577 r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
578 } else {
579 r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
580 }
581 }
582 if err != nil {
583 // The watch stream mechanism handles many common partial data errors, so closed
584 // connections can be retried in many cases.
585 if net.IsProbableEOF(err) {
586 return watch.NewEmptyWatch(), nil
587 }
588 return nil, err
589 }
590 if resp.StatusCode != http.StatusOK {
591 defer resp.Body.Close()
592 if result := r.transformResponse(resp, req); result.err != nil {
593 return nil, result.err
594 }
595 return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
596 }
597 wrapperDecoder := wrapperDecoderFn(resp.Body)
598 return watch.NewStreamWatcher(restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder)), nil
599}
600
601// updateURLMetrics is a convenience function for pushing metrics.
602// It also handles corner cases for incomplete/invalid request data.
603func updateURLMetrics(req *Request, resp *http.Response, err error) {
604 url := "none"
605 if req.baseURL != nil {
606 url = req.baseURL.Host
607 }
608
609 // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
610 // system so we just report them as `<error>`.
611 if err != nil {
612 metrics.RequestResult.Increment("<error>", req.verb, url)
613 } else {
614 //Metrics for failure codes
615 metrics.RequestResult.Increment(strconv.Itoa(resp.StatusCode), req.verb, url)
616 }
617}
618
619// Stream formats and executes the request, and offers streaming of the response.
620// Returns io.ReadCloser which could be used for streaming of the response, or an error
621// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
622// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
623func (r *Request) Stream() (io.ReadCloser, error) {
624 if r.err != nil {
625 return nil, r.err
626 }
627
628 r.tryThrottle()
629
630 url := r.URL().String()
631 req, err := http.NewRequest(r.verb, url, nil)
632 if err != nil {
633 return nil, err
634 }
635 if r.ctx != nil {
636 req = req.WithContext(r.ctx)
637 }
638 req.Header = r.headers
639 client := r.client
640 if client == nil {
641 client = http.DefaultClient
642 }
643 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
644 resp, err := client.Do(req)
645 updateURLMetrics(r, resp, err)
646 if r.baseURL != nil {
647 if err != nil {
648 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
649 } else {
650 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
651 }
652 }
653 if err != nil {
654 return nil, err
655 }
656
657 switch {
658 case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
659 return resp.Body, nil
660
661 default:
662 // ensure we close the body before returning the error
663 defer resp.Body.Close()
664
665 result := r.transformResponse(resp, req)
666 err := result.Error()
667 if err == nil {
668 err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
669 }
670 return nil, err
671 }
672}
673
674// request connects to the server and invokes the provided function when a server response is
675// received. It handles retry behavior and up front validation of requests. It will invoke
676// fn at most once. It will return an error if a problem occurred prior to connecting to the
677// server - the provided function is responsible for handling server errors.
678func (r *Request) request(fn func(*http.Request, *http.Response)) error {
679 //Metrics for total request latency
680 start := time.Now()
681 defer func() {
682 metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
683 }()
684
685 if r.err != nil {
686 klog.V(4).Infof("Error in request: %v", r.err)
687 return r.err
688 }
689
690 // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
691 if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
692 return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
693 }
694 if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
695 return fmt.Errorf("an empty namespace may not be set during creation")
696 }
697
698 client := r.client
699 if client == nil {
700 client = http.DefaultClient
701 }
702
703 // Right now we make about ten retry attempts if we get a Retry-After response.
704 maxRetries := 10
705 retries := 0
706 for {
707 url := r.URL().String()
708 req, err := http.NewRequest(r.verb, url, r.body)
709 if err != nil {
710 return err
711 }
712 if r.timeout > 0 {
713 if r.ctx == nil {
714 r.ctx = context.Background()
715 }
716 var cancelFn context.CancelFunc
717 r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
718 defer cancelFn()
719 }
720 if r.ctx != nil {
721 req = req.WithContext(r.ctx)
722 }
723 req.Header = r.headers
724
725 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
726 if retries > 0 {
727 // We are retrying the request that we already send to apiserver
728 // at least once before.
729 // This request should also be throttled with the client-internal throttler.
730 r.tryThrottle()
731 }
732 resp, err := client.Do(req)
733 updateURLMetrics(r, resp, err)
734 if err != nil {
735 r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
736 } else {
737 r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
738 }
739 if err != nil {
740 // "Connection reset by peer" is usually a transient error.
741 // Thus in case of "GET" operations, we simply retry it.
742 // We are not automatically retrying "write" operations, as
743 // they are not idempotent.
744 if !net.IsConnectionReset(err) || r.verb != "GET" {
745 return err
746 }
747 // For the purpose of retry, we set the artificial "retry-after" response.
748 // TODO: Should we clean the original response if it exists?
749 resp = &http.Response{
750 StatusCode: http.StatusInternalServerError,
751 Header: http.Header{"Retry-After": []string{"1"}},
752 Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
753 }
754 }
755
756 done := func() bool {
757 // Ensure the response body is fully read and closed
758 // before we reconnect, so that we reuse the same TCP
759 // connection.
760 defer func() {
761 const maxBodySlurpSize = 2 << 10
762 if resp.ContentLength <= maxBodySlurpSize {
763 io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
764 }
765 resp.Body.Close()
766 }()
767
768 retries++
769 if seconds, wait := checkWait(resp); wait && retries < maxRetries {
770 if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
771 _, err := seeker.Seek(0, 0)
772 if err != nil {
773 klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
774 fn(req, resp)
775 return true
776 }
777 }
778
779 klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
780 r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
781 return false
782 }
783 fn(req, resp)
784 return true
785 }()
786 if done {
787 return nil
788 }
789 }
790}
791
792// Do formats and executes the request. Returns a Result object for easy response
793// processing.
794//
795// Error type:
796// * If the request can't be constructed, or an error happened earlier while building its
797// arguments: *RequestConstructionError
798// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
799// * http.Client.Do errors are returned directly.
800func (r *Request) Do() Result {
801 r.tryThrottle()
802
803 var result Result
804 err := r.request(func(req *http.Request, resp *http.Response) {
805 result = r.transformResponse(resp, req)
806 })
807 if err != nil {
808 return Result{err: err}
809 }
810 return result
811}
812
813// DoRaw executes the request but does not process the response body.
814func (r *Request) DoRaw() ([]byte, error) {
815 r.tryThrottle()
816
817 var result Result
818 err := r.request(func(req *http.Request, resp *http.Response) {
819 result.body, result.err = ioutil.ReadAll(resp.Body)
820 glogBody("Response Body", result.body)
821 if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
822 result.err = r.transformUnstructuredResponseError(resp, req, result.body)
823 }
824 })
825 if err != nil {
826 return nil, err
827 }
828 return result.body, result.err
829}
830
831// transformResponse converts an API response into a structured API object
832func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
833 var body []byte
834 if resp.Body != nil {
835 data, err := ioutil.ReadAll(resp.Body)
836 switch err.(type) {
837 case nil:
838 body = data
839 case http2.StreamError:
840 // This is trying to catch the scenario that the server may close the connection when sending the
841 // response body. This can be caused by server timeout due to a slow network connection.
842 // TODO: Add test for this. Steps may be:
843 // 1. client-go (or kubectl) sends a GET request.
844 // 2. Apiserver sends back the headers and then part of the body
845 // 3. Apiserver closes connection.
846 // 4. client-go should catch this and return an error.
847 klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
848 streamErr := fmt.Errorf("Stream error %#v when reading response body, may be caused by closed connection. Please retry.", err)
849 return Result{
850 err: streamErr,
851 }
852 default:
853 klog.Errorf("Unexpected error when reading response body: %#v", err)
854 unexpectedErr := fmt.Errorf("Unexpected error %#v when reading response body. Please retry.", err)
855 return Result{
856 err: unexpectedErr,
857 }
858 }
859 }
860
861 glogBody("Response Body", body)
862
863 // verify the content type is accurate
864 contentType := resp.Header.Get("Content-Type")
865 decoder := r.serializers.Decoder
866 if len(contentType) > 0 && (decoder == nil || (len(r.content.ContentType) > 0 && contentType != r.content.ContentType)) {
867 mediaType, params, err := mime.ParseMediaType(contentType)
868 if err != nil {
869 return Result{err: errors.NewInternalError(err)}
870 }
871 decoder, err = r.serializers.RenegotiatedDecoder(mediaType, params)
872 if err != nil {
873 // if we fail to negotiate a decoder, treat this as an unstructured error
874 switch {
875 case resp.StatusCode == http.StatusSwitchingProtocols:
876 // no-op, we've been upgraded
877 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
878 return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
879 }
880 return Result{
881 body: body,
882 contentType: contentType,
883 statusCode: resp.StatusCode,
884 }
885 }
886 }
887
888 switch {
889 case resp.StatusCode == http.StatusSwitchingProtocols:
890 // no-op, we've been upgraded
891 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
892 // calculate an unstructured error from the response which the Result object may use if the caller
893 // did not return a structured error.
894 retryAfter, _ := retryAfterSeconds(resp)
895 err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
896 return Result{
897 body: body,
898 contentType: contentType,
899 statusCode: resp.StatusCode,
900 decoder: decoder,
901 err: err,
902 }
903 }
904
905 return Result{
906 body: body,
907 contentType: contentType,
908 statusCode: resp.StatusCode,
909 decoder: decoder,
910 }
911}
912
913// truncateBody decides if the body should be truncated, based on the glog Verbosity.
914func truncateBody(body string) string {
915 max := 0
916 switch {
917 case bool(klog.V(10)):
918 return body
919 case bool(klog.V(9)):
920 max = 10240
921 case bool(klog.V(8)):
922 max = 1024
923 }
924
925 if len(body) <= max {
926 return body
927 }
928
929 return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
930}
931
932// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
933// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
934// whether the body is printable.
935func glogBody(prefix string, body []byte) {
936 if klog.V(8) {
937 if bytes.IndexFunc(body, func(r rune) bool {
938 return r < 0x0a
939 }) != -1 {
940 klog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
941 } else {
942 klog.Infof("%s: %s", prefix, truncateBody(string(body)))
943 }
944 }
945}
946
947// maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
948const maxUnstructuredResponseTextBytes = 2048
949
950// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
951// It is expected to transform any response that is not recognizable as a clear server sent error from the
952// K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
953// introduce a level of uncertainty to the responses returned by servers that in common use result in
954// unexpected responses. The rough structure is:
955//
956// 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
957// - this is the happy path
958// - when you get this output, trust what the server sends
959// 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
960// generate a reasonable facsimile of the original failure.
961// - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
962// 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
963// 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
964// initial contact, the presence of mismatched body contents from posted content types
965// - Give these a separate distinct error type and capture as much as possible of the original message
966//
967// TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
968func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
969 if body == nil && resp.Body != nil {
970 if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
971 body = data
972 }
973 }
974 retryAfter, _ := retryAfterSeconds(resp)
975 return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
976}
977
978// newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
979func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
980 // cap the amount of output we create
981 if len(body) > maxUnstructuredResponseTextBytes {
982 body = body[:maxUnstructuredResponseTextBytes]
983 }
984
985 message := "unknown"
986 if isTextResponse {
987 message = strings.TrimSpace(string(body))
988 }
989 var groupResource schema.GroupResource
990 if len(r.resource) > 0 {
991 groupResource.Group = r.content.GroupVersion.Group
992 groupResource.Resource = r.resource
993 }
994 return errors.NewGenericServerResponse(
995 statusCode,
996 method,
997 groupResource,
998 r.resourceName,
999 message,
1000 retryAfter,
1001 true,
1002 )
1003}
1004
1005// isTextResponse returns true if the response appears to be a textual media type.
1006func isTextResponse(resp *http.Response) bool {
1007 contentType := resp.Header.Get("Content-Type")
1008 if len(contentType) == 0 {
1009 return true
1010 }
1011 media, _, err := mime.ParseMediaType(contentType)
1012 if err != nil {
1013 return false
1014 }
1015 return strings.HasPrefix(media, "text/")
1016}
1017
1018// checkWait returns true along with a number of seconds if the server instructed us to wait
1019// before retrying.
1020func checkWait(resp *http.Response) (int, bool) {
1021 switch r := resp.StatusCode; {
1022 // any 500 error code and 429 can trigger a wait
1023 case r == http.StatusTooManyRequests, r >= 500:
1024 default:
1025 return 0, false
1026 }
1027 i, ok := retryAfterSeconds(resp)
1028 return i, ok
1029}
1030
1031// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
1032// the header was missing or not a valid number.
1033func retryAfterSeconds(resp *http.Response) (int, bool) {
1034 if h := resp.Header.Get("Retry-After"); len(h) > 0 {
1035 if i, err := strconv.Atoi(h); err == nil {
1036 return i, true
1037 }
1038 }
1039 return 0, false
1040}
1041
1042// Result contains the result of calling Request.Do().
1043type Result struct {
1044 body []byte
1045 contentType string
1046 err error
1047 statusCode int
1048
1049 decoder runtime.Decoder
1050}
1051
1052// Raw returns the raw result.
1053func (r Result) Raw() ([]byte, error) {
1054 return r.body, r.err
1055}
1056
1057// Get returns the result as an object, which means it passes through the decoder.
1058// If the returned object is of type Status and has .Status != StatusSuccess, the
1059// additional information in Status will be used to enrich the error.
1060func (r Result) Get() (runtime.Object, error) {
1061 if r.err != nil {
1062 // Check whether the result has a Status object in the body and prefer that.
1063 return nil, r.Error()
1064 }
1065 if r.decoder == nil {
1066 return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1067 }
1068
1069 // decode, but if the result is Status return that as an error instead.
1070 out, _, err := r.decoder.Decode(r.body, nil, nil)
1071 if err != nil {
1072 return nil, err
1073 }
1074 switch t := out.(type) {
1075 case *metav1.Status:
1076 // any status besides StatusSuccess is considered an error.
1077 if t.Status != metav1.StatusSuccess {
1078 return nil, errors.FromObject(t)
1079 }
1080 }
1081 return out, nil
1082}
1083
1084// StatusCode returns the HTTP status code of the request. (Only valid if no
1085// error was returned.)
1086func (r Result) StatusCode(statusCode *int) Result {
1087 *statusCode = r.statusCode
1088 return r
1089}
1090
1091// Into stores the result into obj, if possible. If obj is nil it is ignored.
1092// If the returned object is of type Status and has .Status != StatusSuccess, the
1093// additional information in Status will be used to enrich the error.
1094func (r Result) Into(obj runtime.Object) error {
1095 if r.err != nil {
1096 // Check whether the result has a Status object in the body and prefer that.
1097 return r.Error()
1098 }
1099 if r.decoder == nil {
1100 return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1101 }
1102 if len(r.body) == 0 {
1103 return fmt.Errorf("0-length response")
1104 }
1105
1106 out, _, err := r.decoder.Decode(r.body, nil, obj)
1107 if err != nil || out == obj {
1108 return err
1109 }
1110 // if a different object is returned, see if it is Status and avoid double decoding
1111 // the object.
1112 switch t := out.(type) {
1113 case *metav1.Status:
1114 // any status besides StatusSuccess is considered an error.
1115 if t.Status != metav1.StatusSuccess {
1116 return errors.FromObject(t)
1117 }
1118 }
1119 return nil
1120}
1121
1122// WasCreated updates the provided bool pointer to whether the server returned
1123// 201 created or a different response.
1124func (r Result) WasCreated(wasCreated *bool) Result {
1125 *wasCreated = r.statusCode == http.StatusCreated
1126 return r
1127}
1128
1129// Error returns the error executing the request, nil if no error occurred.
1130// If the returned object is of type Status and has Status != StatusSuccess, the
1131// additional information in Status will be used to enrich the error.
1132// See the Request.Do() comment for what errors you might get.
1133func (r Result) Error() error {
1134 // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
1135 // a Status object.
1136 if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
1137 return r.err
1138 }
1139
1140 // attempt to convert the body into a Status object
1141 // to be backwards compatible with old servers that do not return a version, default to "v1"
1142 out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
1143 if err != nil {
1144 klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
1145 return r.err
1146 }
1147 switch t := out.(type) {
1148 case *metav1.Status:
1149 // because we default the kind, we *must* check for StatusFailure
1150 if t.Status == metav1.StatusFailure {
1151 return errors.FromObject(t)
1152 }
1153 }
1154 return r.err
1155}
1156
1157// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
1158var NameMayNotBe = []string{".", ".."}
1159
1160// NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
1161var NameMayNotContain = []string{"/", "%"}
1162
1163// IsValidPathSegmentName validates the name can be safely encoded as a path segment
1164func IsValidPathSegmentName(name string) []string {
1165 for _, illegalName := range NameMayNotBe {
1166 if name == illegalName {
1167 return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
1168 }
1169 }
1170
1171 var errors []string
1172 for _, illegalContent := range NameMayNotContain {
1173 if strings.Contains(name, illegalContent) {
1174 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1175 }
1176 }
1177
1178 return errors
1179}
1180
1181// IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
1182// It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
1183func IsValidPathSegmentPrefix(name string) []string {
1184 var errors []string
1185 for _, illegalContent := range NameMayNotContain {
1186 if strings.Contains(name, illegalContent) {
1187 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1188 }
1189 }
1190
1191 return errors
1192}
1193
1194// ValidatePathSegmentName validates the name can be safely encoded as a path segment
1195func ValidatePathSegmentName(name string, prefix bool) []string {
1196 if prefix {
1197 return IsValidPathSegmentPrefix(name)
1198 } else {
1199 return IsValidPathSegmentName(name)
1200 }
1201}