blob: 0ed7def73e7805ddca78d4d672eba9ecbd8adb21 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package rest
18
19import (
20 "bytes"
21 "context"
22 "encoding/hex"
23 "fmt"
24 "io"
25 "io/ioutil"
26 "mime"
27 "net/http"
28 "net/url"
29 "path"
30 "reflect"
31 "strconv"
32 "strings"
33 "sync"
34 "time"
35
36 "golang.org/x/net/http2"
37 "k8s.io/apimachinery/pkg/api/errors"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
42 utilclock "k8s.io/apimachinery/pkg/util/clock"
43 "k8s.io/apimachinery/pkg/util/net"
44 "k8s.io/apimachinery/pkg/watch"
45 restclientwatch "k8s.io/client-go/rest/watch"
46 "k8s.io/client-go/tools/metrics"
47 "k8s.io/client-go/util/flowcontrol"
48 "k8s.io/klog/v2"
49)
50
51var (
52 // longThrottleLatency defines threshold for logging requests. All requests being
53 // throttled (via the provided rateLimiter) for more than longThrottleLatency will
54 // be logged.
55 longThrottleLatency = 50 * time.Millisecond
56
57 // extraLongThrottleLatency defines the threshold for logging requests at log level 2.
58 extraLongThrottleLatency = 1 * time.Second
59)
60
61// HTTPClient is an interface for testing a request object.
62type HTTPClient interface {
63 Do(req *http.Request) (*http.Response, error)
64}
65
66// ResponseWrapper is an interface for getting a response.
67// The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
68type ResponseWrapper interface {
69 DoRaw(context.Context) ([]byte, error)
70 Stream(context.Context) (io.ReadCloser, error)
71}
72
73// RequestConstructionError is returned when there's an error assembling a request.
74type RequestConstructionError struct {
75 Err error
76}
77
78// Error returns a textual description of 'r'.
79func (r *RequestConstructionError) Error() string {
80 return fmt.Sprintf("request construction error: '%v'", r.Err)
81}
82
83var noBackoff = &NoBackoff{}
84
85// Request allows for building up a request to a server in a chained fashion.
86// Any errors are stored until the end of your call, so you only have to
87// check once.
88type Request struct {
89 c *RESTClient
90
91 warningHandler WarningHandler
92
93 rateLimiter flowcontrol.RateLimiter
94 backoff BackoffManager
95 timeout time.Duration
96 maxRetries int
97
98 // generic components accessible via method setters
99 verb string
100 pathPrefix string
101 subpath string
102 params url.Values
103 headers http.Header
104
105 // structural elements of the request that are part of the Kubernetes API conventions
106 namespace string
107 namespaceSet bool
108 resource string
109 resourceName string
110 subresource string
111
112 // output
113 err error
114 body io.Reader
115}
116
117// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
118func NewRequest(c *RESTClient) *Request {
119 var backoff BackoffManager
120 if c.createBackoffMgr != nil {
121 backoff = c.createBackoffMgr()
122 }
123 if backoff == nil {
124 backoff = noBackoff
125 }
126
127 var pathPrefix string
128 if c.base != nil {
129 pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
130 } else {
131 pathPrefix = path.Join("/", c.versionedAPIPath)
132 }
133
134 var timeout time.Duration
135 if c.Client != nil {
136 timeout = c.Client.Timeout
137 }
138
139 r := &Request{
140 c: c,
141 rateLimiter: c.rateLimiter,
142 backoff: backoff,
143 timeout: timeout,
144 pathPrefix: pathPrefix,
145 maxRetries: 10,
146 warningHandler: c.warningHandler,
147 }
148
149 switch {
150 case len(c.content.AcceptContentTypes) > 0:
151 r.SetHeader("Accept", c.content.AcceptContentTypes)
152 case len(c.content.ContentType) > 0:
153 r.SetHeader("Accept", c.content.ContentType+", */*")
154 }
155 return r
156}
157
158// NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
159func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
160 return NewRequest(&RESTClient{
161 base: base,
162 versionedAPIPath: versionedAPIPath,
163 content: content,
164 Client: client,
165 })
166}
167
168// Verb sets the verb this request will use.
169func (r *Request) Verb(verb string) *Request {
170 r.verb = verb
171 return r
172}
173
174// Prefix adds segments to the relative beginning to the request path. These
175// items will be placed before the optional Namespace, Resource, or Name sections.
176// Setting AbsPath will clear any previously set Prefix segments
177func (r *Request) Prefix(segments ...string) *Request {
178 if r.err != nil {
179 return r
180 }
181 r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
182 return r
183}
184
185// Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
186// Namespace, Resource, or Name sections.
187func (r *Request) Suffix(segments ...string) *Request {
188 if r.err != nil {
189 return r
190 }
191 r.subpath = path.Join(r.subpath, path.Join(segments...))
192 return r
193}
194
195// Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
196func (r *Request) Resource(resource string) *Request {
197 if r.err != nil {
198 return r
199 }
200 if len(r.resource) != 0 {
201 r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
202 return r
203 }
204 if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
205 r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
206 return r
207 }
208 r.resource = resource
209 return r
210}
211
212// BackOff sets the request's backoff manager to the one specified,
213// or defaults to the stub implementation if nil is provided
214func (r *Request) BackOff(manager BackoffManager) *Request {
215 if manager == nil {
216 r.backoff = &NoBackoff{}
217 return r
218 }
219
220 r.backoff = manager
221 return r
222}
223
224// WarningHandler sets the handler this client uses when warning headers are encountered.
225// If set to nil, this client will use the default warning handler (see SetDefaultWarningHandler).
226func (r *Request) WarningHandler(handler WarningHandler) *Request {
227 r.warningHandler = handler
228 return r
229}
230
231// Throttle receives a rate-limiter and sets or replaces an existing request limiter
232func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
233 r.rateLimiter = limiter
234 return r
235}
236
237// SubResource sets a sub-resource path which can be multiple segments after the resource
238// name but before the suffix.
239func (r *Request) SubResource(subresources ...string) *Request {
240 if r.err != nil {
241 return r
242 }
243 subresource := path.Join(subresources...)
244 if len(r.subresource) != 0 {
245 r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.resource, subresource)
246 return r
247 }
248 for _, s := range subresources {
249 if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
250 r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
251 return r
252 }
253 }
254 r.subresource = subresource
255 return r
256}
257
258// Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
259func (r *Request) Name(resourceName string) *Request {
260 if r.err != nil {
261 return r
262 }
263 if len(resourceName) == 0 {
264 r.err = fmt.Errorf("resource name may not be empty")
265 return r
266 }
267 if len(r.resourceName) != 0 {
268 r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
269 return r
270 }
271 if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
272 r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
273 return r
274 }
275 r.resourceName = resourceName
276 return r
277}
278
279// Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
280func (r *Request) Namespace(namespace string) *Request {
281 if r.err != nil {
282 return r
283 }
284 if r.namespaceSet {
285 r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
286 return r
287 }
288 if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
289 r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
290 return r
291 }
292 r.namespaceSet = true
293 r.namespace = namespace
294 return r
295}
296
297// NamespaceIfScoped is a convenience function to set a namespace if scoped is true
298func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
299 if scoped {
300 return r.Namespace(namespace)
301 }
302 return r
303}
304
305// AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
306// when a single segment is passed.
307func (r *Request) AbsPath(segments ...string) *Request {
308 if r.err != nil {
309 return r
310 }
311 r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
312 if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
313 // preserve any trailing slashes for legacy behavior
314 r.pathPrefix += "/"
315 }
316 return r
317}
318
319// RequestURI overwrites existing path and parameters with the value of the provided server relative
320// URI.
321func (r *Request) RequestURI(uri string) *Request {
322 if r.err != nil {
323 return r
324 }
325 locator, err := url.Parse(uri)
326 if err != nil {
327 r.err = err
328 return r
329 }
330 r.pathPrefix = locator.Path
331 if len(locator.Query()) > 0 {
332 if r.params == nil {
333 r.params = make(url.Values)
334 }
335 for k, v := range locator.Query() {
336 r.params[k] = v
337 }
338 }
339 return r
340}
341
342// Param creates a query parameter with the given string value.
343func (r *Request) Param(paramName, s string) *Request {
344 if r.err != nil {
345 return r
346 }
347 return r.setParam(paramName, s)
348}
349
350// VersionedParams will take the provided object, serialize it to a map[string][]string using the
351// implicit RESTClient API version and the default parameter codec, and then add those as parameters
352// to the request. Use this to provide versioned query parameters from client libraries.
353// VersionedParams will not write query parameters that have omitempty set and are empty. If a
354// parameter has already been set it is appended to (Params and VersionedParams are additive).
355func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
356 return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
357}
358
359func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
360 if r.err != nil {
361 return r
362 }
363 params, err := codec.EncodeParameters(obj, version)
364 if err != nil {
365 r.err = err
366 return r
367 }
368 for k, v := range params {
369 if r.params == nil {
370 r.params = make(url.Values)
371 }
372 r.params[k] = append(r.params[k], v...)
373 }
374 return r
375}
376
377func (r *Request) setParam(paramName, value string) *Request {
378 if r.params == nil {
379 r.params = make(url.Values)
380 }
381 r.params[paramName] = append(r.params[paramName], value)
382 return r
383}
384
385func (r *Request) SetHeader(key string, values ...string) *Request {
386 if r.headers == nil {
387 r.headers = http.Header{}
388 }
389 r.headers.Del(key)
390 for _, value := range values {
391 r.headers.Add(key, value)
392 }
393 return r
394}
395
396// Timeout makes the request use the given duration as an overall timeout for the
397// request. Additionally, if set passes the value as "timeout" parameter in URL.
398func (r *Request) Timeout(d time.Duration) *Request {
399 if r.err != nil {
400 return r
401 }
402 r.timeout = d
403 return r
404}
405
406// MaxRetries makes the request use the given integer as a ceiling of retrying upon receiving
407// "Retry-After" headers and 429 status-code in the response. The default is 10 unless this
408// function is specifically called with a different value.
409// A zero maxRetries prevent it from doing retires and return an error immediately.
410func (r *Request) MaxRetries(maxRetries int) *Request {
411 if maxRetries < 0 {
412 maxRetries = 0
413 }
414 r.maxRetries = maxRetries
415 return r
416}
417
418// Body makes the request use obj as the body. Optional.
419// If obj is a string, try to read a file of that name.
420// If obj is a []byte, send it directly.
421// If obj is an io.Reader, use it directly.
422// If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
423// If obj is a runtime.Object and nil, do nothing.
424// Otherwise, set an error.
425func (r *Request) Body(obj interface{}) *Request {
426 if r.err != nil {
427 return r
428 }
429 switch t := obj.(type) {
430 case string:
431 data, err := ioutil.ReadFile(t)
432 if err != nil {
433 r.err = err
434 return r
435 }
436 glogBody("Request Body", data)
437 r.body = bytes.NewReader(data)
438 case []byte:
439 glogBody("Request Body", t)
440 r.body = bytes.NewReader(t)
441 case io.Reader:
442 r.body = t
443 case runtime.Object:
444 // callers may pass typed interface pointers, therefore we must check nil with reflection
445 if reflect.ValueOf(t).IsNil() {
446 return r
447 }
448 encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
449 if err != nil {
450 r.err = err
451 return r
452 }
453 data, err := runtime.Encode(encoder, t)
454 if err != nil {
455 r.err = err
456 return r
457 }
458 glogBody("Request Body", data)
459 r.body = bytes.NewReader(data)
460 r.SetHeader("Content-Type", r.c.content.ContentType)
461 default:
462 r.err = fmt.Errorf("unknown type used for body: %+v", obj)
463 }
464 return r
465}
466
467// URL returns the current working URL.
468func (r *Request) URL() *url.URL {
469 p := r.pathPrefix
470 if r.namespaceSet && len(r.namespace) > 0 {
471 p = path.Join(p, "namespaces", r.namespace)
472 }
473 if len(r.resource) != 0 {
474 p = path.Join(p, strings.ToLower(r.resource))
475 }
476 // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
477 if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
478 p = path.Join(p, r.resourceName, r.subresource, r.subpath)
479 }
480
481 finalURL := &url.URL{}
482 if r.c.base != nil {
483 *finalURL = *r.c.base
484 }
485 finalURL.Path = p
486
487 query := url.Values{}
488 for key, values := range r.params {
489 for _, value := range values {
490 query.Add(key, value)
491 }
492 }
493
494 // timeout is handled specially here.
495 if r.timeout != 0 {
496 query.Set("timeout", r.timeout.String())
497 }
498 finalURL.RawQuery = query.Encode()
499 return finalURL
500}
501
502// finalURLTemplate is similar to URL(), but will make all specific parameter values equal
503// - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
504// parameters will be reset. This creates a copy of the url so as not to change the
505// underlying object.
506func (r Request) finalURLTemplate() url.URL {
507 newParams := url.Values{}
508 v := []string{"{value}"}
509 for k := range r.params {
510 newParams[k] = v
511 }
512 r.params = newParams
513 url := r.URL()
514 segments := strings.Split(r.URL().Path, "/")
515 groupIndex := 0
516 index := 0
517 if r.URL() != nil && r.c.base != nil && strings.Contains(r.URL().Path, r.c.base.Path) {
518 groupIndex += len(strings.Split(r.c.base.Path, "/"))
519 }
520 if groupIndex >= len(segments) {
521 return *url
522 }
523
524 const CoreGroupPrefix = "api"
525 const NamedGroupPrefix = "apis"
526 isCoreGroup := segments[groupIndex] == CoreGroupPrefix
527 isNamedGroup := segments[groupIndex] == NamedGroupPrefix
528 if isCoreGroup {
529 // checking the case of core group with /api/v1/... format
530 index = groupIndex + 2
531 } else if isNamedGroup {
532 // checking the case of named group with /apis/apps/v1/... format
533 index = groupIndex + 3
534 } else {
535 // this should not happen that the only two possibilities are /api... and /apis..., just want to put an
536 // outlet here in case more API groups are added in future if ever possible:
537 // https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
538 // if a wrong API groups name is encountered, return the {prefix} for url.Path
539 url.Path = "/{prefix}"
540 url.RawQuery = ""
541 return *url
542 }
543 //switch segLength := len(segments) - index; segLength {
544 switch {
545 // case len(segments) - index == 1:
546 // resource (with no name) do nothing
547 case len(segments)-index == 2:
548 // /$RESOURCE/$NAME: replace $NAME with {name}
549 segments[index+1] = "{name}"
550 case len(segments)-index == 3:
551 if segments[index+2] == "finalize" || segments[index+2] == "status" {
552 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
553 segments[index+1] = "{name}"
554 } else {
555 // /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
556 segments[index+1] = "{namespace}"
557 }
558 case len(segments)-index >= 4:
559 segments[index+1] = "{namespace}"
560 // /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
561 if segments[index+3] != "finalize" && segments[index+3] != "status" {
562 // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
563 segments[index+3] = "{name}"
564 }
565 }
566 url.Path = path.Join(segments...)
567 return *url
568}
569
570func (r *Request) tryThrottle(ctx context.Context) error {
571 if r.rateLimiter == nil {
572 return nil
573 }
574
575 now := time.Now()
576
577 err := r.rateLimiter.Wait(ctx)
578
579 latency := time.Since(now)
580 if latency > longThrottleLatency {
581 klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
582 }
583 if latency > extraLongThrottleLatency {
584 // If the rate limiter latency is very high, the log message should be printed at a higher log level,
585 // but we use a throttled logger to prevent spamming.
586 globalThrottledLogger.Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
587 }
588 metrics.RateLimiterLatency.Observe(r.verb, r.finalURLTemplate(), latency)
589
590 return err
591}
592
593type throttleSettings struct {
594 logLevel klog.Level
595 minLogInterval time.Duration
596
597 lastLogTime time.Time
598 lock sync.RWMutex
599}
600
601type throttledLogger struct {
602 clock utilclock.PassiveClock
603 settings []*throttleSettings
604}
605
606var globalThrottledLogger = &throttledLogger{
607 clock: utilclock.RealClock{},
608 settings: []*throttleSettings{
609 {
610 logLevel: 2,
611 minLogInterval: 1 * time.Second,
612 }, {
613 logLevel: 0,
614 minLogInterval: 10 * time.Second,
615 },
616 },
617}
618
619func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
620 for _, setting := range b.settings {
621 if bool(klog.V(setting.logLevel).Enabled()) {
622 // Return early without write locking if possible.
623 if func() bool {
624 setting.lock.RLock()
625 defer setting.lock.RUnlock()
626 return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
627 }() {
628 setting.lock.Lock()
629 defer setting.lock.Unlock()
630 if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
631 setting.lastLogTime = b.clock.Now()
632 return setting.logLevel, true
633 }
634 }
635 return -1, false
636 }
637 }
638 return -1, false
639}
640
641// Infof will write a log message at each logLevel specified by the reciever's throttleSettings
642// as long as it hasn't written a log message more recently than minLogInterval.
643func (b *throttledLogger) Infof(message string, args ...interface{}) {
644 if logLevel, ok := b.attemptToLog(); ok {
645 klog.V(logLevel).Infof(message, args...)
646 }
647}
648
649// Watch attempts to begin watching the requested location.
650// Returns a watch.Interface, or an error.
651func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
652 // We specifically don't want to rate limit watches, so we
653 // don't use r.rateLimiter here.
654 if r.err != nil {
655 return nil, r.err
656 }
657
658 url := r.URL().String()
659 req, err := http.NewRequest(r.verb, url, r.body)
660 if err != nil {
661 return nil, err
662 }
663 req = req.WithContext(ctx)
664 req.Header = r.headers
665 client := r.c.Client
666 if client == nil {
667 client = http.DefaultClient
668 }
669 r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
670 resp, err := client.Do(req)
671 updateURLMetrics(r, resp, err)
672 if r.c.base != nil {
673 if err != nil {
674 r.backoff.UpdateBackoff(r.c.base, err, 0)
675 } else {
676 r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
677 }
678 }
679 if err != nil {
680 // The watch stream mechanism handles many common partial data errors, so closed
681 // connections can be retried in many cases.
682 if net.IsProbableEOF(err) || net.IsTimeout(err) {
683 return watch.NewEmptyWatch(), nil
684 }
685 return nil, err
686 }
687 if resp.StatusCode != http.StatusOK {
688 defer resp.Body.Close()
689 if result := r.transformResponse(resp, req); result.err != nil {
690 return nil, result.err
691 }
692 return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
693 }
694
695 contentType := resp.Header.Get("Content-Type")
696 mediaType, params, err := mime.ParseMediaType(contentType)
697 if err != nil {
698 klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
699 }
700 objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
701 if err != nil {
702 return nil, err
703 }
704
705 handleWarnings(resp.Header, r.warningHandler)
706
707 frameReader := framer.NewFrameReader(resp.Body)
708 watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
709
710 return watch.NewStreamWatcher(
711 restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
712 // use 500 to indicate that the cause of the error is unknown - other error codes
713 // are more specific to HTTP interactions, and set a reason
714 errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
715 ), nil
716}
717
718// updateURLMetrics is a convenience function for pushing metrics.
719// It also handles corner cases for incomplete/invalid request data.
720func updateURLMetrics(req *Request, resp *http.Response, err error) {
721 url := "none"
722 if req.c.base != nil {
723 url = req.c.base.Host
724 }
725
726 // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
727 // system so we just report them as `<error>`.
728 if err != nil {
729 metrics.RequestResult.Increment("<error>", req.verb, url)
730 } else {
731 //Metrics for failure codes
732 metrics.RequestResult.Increment(strconv.Itoa(resp.StatusCode), req.verb, url)
733 }
734}
735
736// Stream formats and executes the request, and offers streaming of the response.
737// Returns io.ReadCloser which could be used for streaming of the response, or an error
738// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
739// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
740func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
741 if r.err != nil {
742 return nil, r.err
743 }
744
745 if err := r.tryThrottle(ctx); err != nil {
746 return nil, err
747 }
748
749 url := r.URL().String()
750 req, err := http.NewRequest(r.verb, url, nil)
751 if err != nil {
752 return nil, err
753 }
754 if r.body != nil {
755 req.Body = ioutil.NopCloser(r.body)
756 }
757 req = req.WithContext(ctx)
758 req.Header = r.headers
759 client := r.c.Client
760 if client == nil {
761 client = http.DefaultClient
762 }
763 r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
764 resp, err := client.Do(req)
765 updateURLMetrics(r, resp, err)
766 if r.c.base != nil {
767 if err != nil {
768 r.backoff.UpdateBackoff(r.URL(), err, 0)
769 } else {
770 r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
771 }
772 }
773 if err != nil {
774 return nil, err
775 }
776
777 switch {
778 case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
779 handleWarnings(resp.Header, r.warningHandler)
780 return resp.Body, nil
781
782 default:
783 // ensure we close the body before returning the error
784 defer resp.Body.Close()
785
786 result := r.transformResponse(resp, req)
787 err := result.Error()
788 if err == nil {
789 err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
790 }
791 return nil, err
792 }
793}
794
795// requestPreflightCheck looks for common programmer errors on Request.
796//
797// We tackle here two programmer mistakes. The first one is to try to create
798// something(POST) using an empty string as namespace with namespaceSet as
799// true. If namespaceSet is true then namespace should also be defined. The
800// second mistake is, when under the same circumstances, the programmer tries
801// to GET, PUT or DELETE a named resource(resourceName != ""), again, if
802// namespaceSet is true then namespace must not be empty.
803func (r *Request) requestPreflightCheck() error {
804 if !r.namespaceSet {
805 return nil
806 }
807 if len(r.namespace) > 0 {
808 return nil
809 }
810
811 switch r.verb {
812 case "POST":
813 return fmt.Errorf("an empty namespace may not be set during creation")
814 case "GET", "PUT", "DELETE":
815 if len(r.resourceName) > 0 {
816 return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
817 }
818 }
819 return nil
820}
821
822// request connects to the server and invokes the provided function when a server response is
823// received. It handles retry behavior and up front validation of requests. It will invoke
824// fn at most once. It will return an error if a problem occurred prior to connecting to the
825// server - the provided function is responsible for handling server errors.
826func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
827 //Metrics for total request latency
828 start := time.Now()
829 defer func() {
830 metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
831 }()
832
833 if r.err != nil {
834 klog.V(4).Infof("Error in request: %v", r.err)
835 return r.err
836 }
837
838 if err := r.requestPreflightCheck(); err != nil {
839 return err
840 }
841
842 client := r.c.Client
843 if client == nil {
844 client = http.DefaultClient
845 }
846
847 // Throttle the first try before setting up the timeout configured on the
848 // client. We don't want a throttled client to return timeouts to callers
849 // before it makes a single request.
850 if err := r.tryThrottle(ctx); err != nil {
851 return err
852 }
853
854 if r.timeout > 0 {
855 var cancel context.CancelFunc
856 ctx, cancel = context.WithTimeout(ctx, r.timeout)
857 defer cancel()
858 }
859
860 // Right now we make about ten retry attempts if we get a Retry-After response.
861 retries := 0
862 for {
863
864 url := r.URL().String()
865 req, err := http.NewRequest(r.verb, url, r.body)
866 if err != nil {
867 return err
868 }
869 req = req.WithContext(ctx)
870 req.Header = r.headers
871
872 r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
873 if retries > 0 {
874 // We are retrying the request that we already send to apiserver
875 // at least once before.
876 // This request should also be throttled with the client-internal rate limiter.
877 if err := r.tryThrottle(ctx); err != nil {
878 return err
879 }
880 }
881 resp, err := client.Do(req)
882 updateURLMetrics(r, resp, err)
883 if err != nil {
884 r.backoff.UpdateBackoff(r.URL(), err, 0)
885 } else {
886 r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
887 }
888 if err != nil {
889 // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
890 // Thus in case of "GET" operations, we simply retry it.
891 // We are not automatically retrying "write" operations, as
892 // they are not idempotent.
893 if r.verb != "GET" {
894 return err
895 }
896 // For connection errors and apiserver shutdown errors retry.
897 if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
898 // For the purpose of retry, we set the artificial "retry-after" response.
899 // TODO: Should we clean the original response if it exists?
900 resp = &http.Response{
901 StatusCode: http.StatusInternalServerError,
902 Header: http.Header{"Retry-After": []string{"1"}},
903 Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
904 }
905 } else {
906 return err
907 }
908 }
909
910 done := func() bool {
911 // Ensure the response body is fully read and closed
912 // before we reconnect, so that we reuse the same TCP
913 // connection.
914 defer func() {
915 const maxBodySlurpSize = 2 << 10
916 if resp.ContentLength <= maxBodySlurpSize {
917 io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
918 }
919 resp.Body.Close()
920 }()
921
922 retries++
923 if seconds, wait := checkWait(resp); wait && retries <= r.maxRetries {
924 if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
925 _, err := seeker.Seek(0, 0)
926 if err != nil {
927 klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
928 fn(req, resp)
929 return true
930 }
931 }
932
933 klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
934 r.backoff.Sleep(time.Duration(seconds) * time.Second)
935 return false
936 }
937 fn(req, resp)
938 return true
939 }()
940 if done {
941 return nil
942 }
943 }
944}
945
946// Do formats and executes the request. Returns a Result object for easy response
947// processing.
948//
949// Error type:
950// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
951// * http.Client.Do errors are returned directly.
952func (r *Request) Do(ctx context.Context) Result {
953 var result Result
954 err := r.request(ctx, func(req *http.Request, resp *http.Response) {
955 result = r.transformResponse(resp, req)
956 })
957 if err != nil {
958 return Result{err: err}
959 }
960 return result
961}
962
963// DoRaw executes the request but does not process the response body.
964func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
965 var result Result
966 err := r.request(ctx, func(req *http.Request, resp *http.Response) {
967 result.body, result.err = ioutil.ReadAll(resp.Body)
968 glogBody("Response Body", result.body)
969 if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
970 result.err = r.transformUnstructuredResponseError(resp, req, result.body)
971 }
972 })
973 if err != nil {
974 return nil, err
975 }
976 return result.body, result.err
977}
978
979// transformResponse converts an API response into a structured API object
980func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
981 var body []byte
982 if resp.Body != nil {
983 data, err := ioutil.ReadAll(resp.Body)
984 switch err.(type) {
985 case nil:
986 body = data
987 case http2.StreamError:
988 // This is trying to catch the scenario that the server may close the connection when sending the
989 // response body. This can be caused by server timeout due to a slow network connection.
990 // TODO: Add test for this. Steps may be:
991 // 1. client-go (or kubectl) sends a GET request.
992 // 2. Apiserver sends back the headers and then part of the body
993 // 3. Apiserver closes connection.
994 // 4. client-go should catch this and return an error.
995 klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
996 streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
997 return Result{
998 err: streamErr,
999 }
1000 default:
1001 klog.Errorf("Unexpected error when reading response body: %v", err)
1002 unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err)
1003 return Result{
1004 err: unexpectedErr,
1005 }
1006 }
1007 }
1008
1009 glogBody("Response Body", body)
1010
1011 // verify the content type is accurate
1012 var decoder runtime.Decoder
1013 contentType := resp.Header.Get("Content-Type")
1014 if len(contentType) == 0 {
1015 contentType = r.c.content.ContentType
1016 }
1017 if len(contentType) > 0 {
1018 var err error
1019 mediaType, params, err := mime.ParseMediaType(contentType)
1020 if err != nil {
1021 return Result{err: errors.NewInternalError(err)}
1022 }
1023 decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
1024 if err != nil {
1025 // if we fail to negotiate a decoder, treat this as an unstructured error
1026 switch {
1027 case resp.StatusCode == http.StatusSwitchingProtocols:
1028 // no-op, we've been upgraded
1029 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
1030 return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
1031 }
1032 return Result{
1033 body: body,
1034 contentType: contentType,
1035 statusCode: resp.StatusCode,
1036 warnings: handleWarnings(resp.Header, r.warningHandler),
1037 }
1038 }
1039 }
1040
1041 switch {
1042 case resp.StatusCode == http.StatusSwitchingProtocols:
1043 // no-op, we've been upgraded
1044 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
1045 // calculate an unstructured error from the response which the Result object may use if the caller
1046 // did not return a structured error.
1047 retryAfter, _ := retryAfterSeconds(resp)
1048 err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
1049 return Result{
1050 body: body,
1051 contentType: contentType,
1052 statusCode: resp.StatusCode,
1053 decoder: decoder,
1054 err: err,
1055 warnings: handleWarnings(resp.Header, r.warningHandler),
1056 }
1057 }
1058
1059 return Result{
1060 body: body,
1061 contentType: contentType,
1062 statusCode: resp.StatusCode,
1063 decoder: decoder,
1064 warnings: handleWarnings(resp.Header, r.warningHandler),
1065 }
1066}
1067
1068// truncateBody decides if the body should be truncated, based on the glog Verbosity.
1069func truncateBody(body string) string {
1070 max := 0
1071 switch {
1072 case bool(klog.V(10).Enabled()):
1073 return body
1074 case bool(klog.V(9).Enabled()):
1075 max = 10240
1076 case bool(klog.V(8).Enabled()):
1077 max = 1024
1078 }
1079
1080 if len(body) <= max {
1081 return body
1082 }
1083
1084 return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
1085}
1086
1087// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
1088// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
1089// whether the body is printable.
1090func glogBody(prefix string, body []byte) {
1091 if klog.V(8).Enabled() {
1092 if bytes.IndexFunc(body, func(r rune) bool {
1093 return r < 0x0a
1094 }) != -1 {
1095 klog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
1096 } else {
1097 klog.Infof("%s: %s", prefix, truncateBody(string(body)))
1098 }
1099 }
1100}
1101
1102// maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
1103const maxUnstructuredResponseTextBytes = 2048
1104
1105// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
1106// It is expected to transform any response that is not recognizable as a clear server sent error from the
1107// K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
1108// introduce a level of uncertainty to the responses returned by servers that in common use result in
1109// unexpected responses. The rough structure is:
1110//
1111// 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
1112// - this is the happy path
1113// - when you get this output, trust what the server sends
1114// 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
1115// generate a reasonable facsimile of the original failure.
1116// - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
1117// 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
1118// 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
1119// initial contact, the presence of mismatched body contents from posted content types
1120// - Give these a separate distinct error type and capture as much as possible of the original message
1121//
1122// TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
1123func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
1124 if body == nil && resp.Body != nil {
1125 if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
1126 body = data
1127 }
1128 }
1129 retryAfter, _ := retryAfterSeconds(resp)
1130 return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
1131}
1132
1133// newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
1134func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
1135 // cap the amount of output we create
1136 if len(body) > maxUnstructuredResponseTextBytes {
1137 body = body[:maxUnstructuredResponseTextBytes]
1138 }
1139
1140 message := "unknown"
1141 if isTextResponse {
1142 message = strings.TrimSpace(string(body))
1143 }
1144 var groupResource schema.GroupResource
1145 if len(r.resource) > 0 {
1146 groupResource.Group = r.c.content.GroupVersion.Group
1147 groupResource.Resource = r.resource
1148 }
1149 return errors.NewGenericServerResponse(
1150 statusCode,
1151 method,
1152 groupResource,
1153 r.resourceName,
1154 message,
1155 retryAfter,
1156 true,
1157 )
1158}
1159
1160// isTextResponse returns true if the response appears to be a textual media type.
1161func isTextResponse(resp *http.Response) bool {
1162 contentType := resp.Header.Get("Content-Type")
1163 if len(contentType) == 0 {
1164 return true
1165 }
1166 media, _, err := mime.ParseMediaType(contentType)
1167 if err != nil {
1168 return false
1169 }
1170 return strings.HasPrefix(media, "text/")
1171}
1172
1173// checkWait returns true along with a number of seconds if the server instructed us to wait
1174// before retrying.
1175func checkWait(resp *http.Response) (int, bool) {
1176 switch r := resp.StatusCode; {
1177 // any 500 error code and 429 can trigger a wait
1178 case r == http.StatusTooManyRequests, r >= 500:
1179 default:
1180 return 0, false
1181 }
1182 i, ok := retryAfterSeconds(resp)
1183 return i, ok
1184}
1185
1186// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
1187// the header was missing or not a valid number.
1188func retryAfterSeconds(resp *http.Response) (int, bool) {
1189 if h := resp.Header.Get("Retry-After"); len(h) > 0 {
1190 if i, err := strconv.Atoi(h); err == nil {
1191 return i, true
1192 }
1193 }
1194 return 0, false
1195}
1196
1197// Result contains the result of calling Request.Do().
1198type Result struct {
1199 body []byte
1200 warnings []net.WarningHeader
1201 contentType string
1202 err error
1203 statusCode int
1204
1205 decoder runtime.Decoder
1206}
1207
1208// Raw returns the raw result.
1209func (r Result) Raw() ([]byte, error) {
1210 return r.body, r.err
1211}
1212
1213// Get returns the result as an object, which means it passes through the decoder.
1214// If the returned object is of type Status and has .Status != StatusSuccess, the
1215// additional information in Status will be used to enrich the error.
1216func (r Result) Get() (runtime.Object, error) {
1217 if r.err != nil {
1218 // Check whether the result has a Status object in the body and prefer that.
1219 return nil, r.Error()
1220 }
1221 if r.decoder == nil {
1222 return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1223 }
1224
1225 // decode, but if the result is Status return that as an error instead.
1226 out, _, err := r.decoder.Decode(r.body, nil, nil)
1227 if err != nil {
1228 return nil, err
1229 }
1230 switch t := out.(type) {
1231 case *metav1.Status:
1232 // any status besides StatusSuccess is considered an error.
1233 if t.Status != metav1.StatusSuccess {
1234 return nil, errors.FromObject(t)
1235 }
1236 }
1237 return out, nil
1238}
1239
1240// StatusCode returns the HTTP status code of the request. (Only valid if no
1241// error was returned.)
1242func (r Result) StatusCode(statusCode *int) Result {
1243 *statusCode = r.statusCode
1244 return r
1245}
1246
1247// Into stores the result into obj, if possible. If obj is nil it is ignored.
1248// If the returned object is of type Status and has .Status != StatusSuccess, the
1249// additional information in Status will be used to enrich the error.
1250func (r Result) Into(obj runtime.Object) error {
1251 if r.err != nil {
1252 // Check whether the result has a Status object in the body and prefer that.
1253 return r.Error()
1254 }
1255 if r.decoder == nil {
1256 return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1257 }
1258 if len(r.body) == 0 {
1259 return fmt.Errorf("0-length response with status code: %d and content type: %s",
1260 r.statusCode, r.contentType)
1261 }
1262
1263 out, _, err := r.decoder.Decode(r.body, nil, obj)
1264 if err != nil || out == obj {
1265 return err
1266 }
1267 // if a different object is returned, see if it is Status and avoid double decoding
1268 // the object.
1269 switch t := out.(type) {
1270 case *metav1.Status:
1271 // any status besides StatusSuccess is considered an error.
1272 if t.Status != metav1.StatusSuccess {
1273 return errors.FromObject(t)
1274 }
1275 }
1276 return nil
1277}
1278
1279// WasCreated updates the provided bool pointer to whether the server returned
1280// 201 created or a different response.
1281func (r Result) WasCreated(wasCreated *bool) Result {
1282 *wasCreated = r.statusCode == http.StatusCreated
1283 return r
1284}
1285
1286// Error returns the error executing the request, nil if no error occurred.
1287// If the returned object is of type Status and has Status != StatusSuccess, the
1288// additional information in Status will be used to enrich the error.
1289// See the Request.Do() comment for what errors you might get.
1290func (r Result) Error() error {
1291 // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
1292 // a Status object.
1293 if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
1294 return r.err
1295 }
1296
1297 // attempt to convert the body into a Status object
1298 // to be backwards compatible with old servers that do not return a version, default to "v1"
1299 out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
1300 if err != nil {
1301 klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
1302 return r.err
1303 }
1304 switch t := out.(type) {
1305 case *metav1.Status:
1306 // because we default the kind, we *must* check for StatusFailure
1307 if t.Status == metav1.StatusFailure {
1308 return errors.FromObject(t)
1309 }
1310 }
1311 return r.err
1312}
1313
1314// Warnings returns any warning headers received in the response
1315func (r Result) Warnings() []net.WarningHeader {
1316 return r.warnings
1317}
1318
1319// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
1320var NameMayNotBe = []string{".", ".."}
1321
1322// NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
1323var NameMayNotContain = []string{"/", "%"}
1324
1325// IsValidPathSegmentName validates the name can be safely encoded as a path segment
1326func IsValidPathSegmentName(name string) []string {
1327 for _, illegalName := range NameMayNotBe {
1328 if name == illegalName {
1329 return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
1330 }
1331 }
1332
1333 var errors []string
1334 for _, illegalContent := range NameMayNotContain {
1335 if strings.Contains(name, illegalContent) {
1336 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1337 }
1338 }
1339
1340 return errors
1341}
1342
1343// IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
1344// It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
1345func IsValidPathSegmentPrefix(name string) []string {
1346 var errors []string
1347 for _, illegalContent := range NameMayNotContain {
1348 if strings.Contains(name, illegalContent) {
1349 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1350 }
1351 }
1352
1353 return errors
1354}
1355
1356// ValidatePathSegmentName validates the name can be safely encoded as a path segment
1357func ValidatePathSegmentName(name string, prefix bool) []string {
1358 if prefix {
1359 return IsValidPathSegmentPrefix(name)
1360 }
1361 return IsValidPathSegmentName(name)
1362}