blob: 627bb2de94bf4f2d9f099fe8a567ae2aa9fe2cc5 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package exec
18
19import (
20 "bytes"
21 "context"
22 "crypto/tls"
23 "crypto/x509"
24 "errors"
25 "fmt"
26 "io"
27 "net"
28 "net/http"
29 "os"
30 "os/exec"
31 "reflect"
32 "strings"
33 "sync"
34 "time"
35
36 "github.com/davecgh/go-spew/spew"
37 "golang.org/x/crypto/ssh/terminal"
38 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/apimachinery/pkg/runtime/serializer"
42 "k8s.io/apimachinery/pkg/util/clock"
43 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
44 "k8s.io/client-go/pkg/apis/clientauthentication"
45 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
46 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
47 "k8s.io/client-go/tools/clientcmd/api"
48 "k8s.io/client-go/tools/metrics"
49 "k8s.io/client-go/transport"
50 "k8s.io/client-go/util/connrotation"
51 "k8s.io/klog/v2"
52)
53
54const execInfoEnv = "KUBERNETES_EXEC_INFO"
55const onRotateListWarningLength = 1000
56const installHintVerboseHelp = `
57
58It looks like you are trying to use a client-go credential plugin that is not installed.
59
60To learn more about this feature, consult the documentation available at:
61 https://kubernetes.io/docs/reference/access-authn-authz/authentication/#client-go-credential-plugins`
62
63var scheme = runtime.NewScheme()
64var codecs = serializer.NewCodecFactory(scheme)
65
66func init() {
67 v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
68 utilruntime.Must(v1alpha1.AddToScheme(scheme))
69 utilruntime.Must(v1beta1.AddToScheme(scheme))
70 utilruntime.Must(clientauthentication.AddToScheme(scheme))
71}
72
73var (
74 // Since transports can be constantly re-initialized by programs like kubectl,
75 // keep a cache of initialized authenticators keyed by a hash of their config.
76 globalCache = newCache()
77 // The list of API versions we accept.
78 apiVersions = map[string]schema.GroupVersion{
79 v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
80 v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion,
81 }
82)
83
84func newCache() *cache {
85 return &cache{m: make(map[string]*Authenticator)}
86}
87
88var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
89
90func cacheKey(c *api.ExecConfig) string {
91 return spewConfig.Sprint(c)
92}
93
94type cache struct {
95 mu sync.Mutex
96 m map[string]*Authenticator
97}
98
99func (c *cache) get(s string) (*Authenticator, bool) {
100 c.mu.Lock()
101 defer c.mu.Unlock()
102 a, ok := c.m[s]
103 return a, ok
104}
105
106// put inserts an authenticator into the cache. If an authenticator is already
107// associated with the key, the first one is returned instead.
108func (c *cache) put(s string, a *Authenticator) *Authenticator {
109 c.mu.Lock()
110 defer c.mu.Unlock()
111 existing, ok := c.m[s]
112 if ok {
113 return existing
114 }
115 c.m[s] = a
116 return a
117}
118
119// sometimes rate limits how often a function f() is called. Specifically, Do()
120// will run the provided function f() up to threshold times every interval
121// duration.
122type sometimes struct {
123 threshold int
124 interval time.Duration
125
126 clock clock.Clock
127 mu sync.Mutex
128
129 count int // times we have called f() in this window
130 window time.Time // beginning of current window of length interval
131}
132
133func (s *sometimes) Do(f func()) {
134 s.mu.Lock()
135 defer s.mu.Unlock()
136
137 now := s.clock.Now()
138 if s.window.IsZero() {
139 s.window = now
140 }
141
142 // If we are no longer in our saved time window, then we get to reset our run
143 // count back to 0 and start increasing towards the threshold again.
144 if inWindow := now.Sub(s.window) < s.interval; !inWindow {
145 s.window = now
146 s.count = 0
147 }
148
149 // If we have not run the function more than threshold times in this current
150 // time window, we get to run it now!
151 if underThreshold := s.count < s.threshold; underThreshold {
152 s.count++
153 f()
154 }
155}
156
157// GetAuthenticator returns an exec-based plugin for providing client credentials.
158func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
159 return newAuthenticator(globalCache, config)
160}
161
162func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
163 key := cacheKey(config)
164 if a, ok := c.get(key); ok {
165 return a, nil
166 }
167
168 gv, ok := apiVersions[config.APIVersion]
169 if !ok {
170 return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
171 }
172
173 a := &Authenticator{
174 cmd: config.Command,
175 args: config.Args,
176 group: gv,
177
178 installHint: config.InstallHint,
179 sometimes: &sometimes{
180 threshold: 10,
181 interval: time.Hour,
182 clock: clock.RealClock{},
183 },
184
185 stdin: os.Stdin,
186 stderr: os.Stderr,
187 interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
188 now: time.Now,
189 environ: os.Environ,
190 }
191
192 for _, env := range config.Env {
193 a.env = append(a.env, env.Name+"="+env.Value)
194 }
195
196 return c.put(key, a), nil
197}
198
199// Authenticator is a client credential provider that rotates credentials by executing a plugin.
200// The plugin input and output are defined by the API group client.authentication.k8s.io.
201type Authenticator struct {
202 // Set by the config
203 cmd string
204 args []string
205 group schema.GroupVersion
206 env []string
207
208 // Used to avoid log spew by rate limiting install hint printing. We didn't do
209 // this by interval based rate limiting alone since that way may have prevented
210 // the install hint from showing up for kubectl users.
211 sometimes *sometimes
212 installHint string
213
214 // Stubbable for testing
215 stdin io.Reader
216 stderr io.Writer
217 interactive bool
218 now func() time.Time
219 environ func() []string
220
221 // Cached results.
222 //
223 // The mutex also guards calling the plugin. Since the plugin could be
224 // interactive we want to make sure it's only called once.
225 mu sync.Mutex
226 cachedCreds *credentials
227 exp time.Time
228
229 onRotateList []func()
230}
231
232type credentials struct {
233 token string
234 cert *tls.Certificate
235}
236
237// UpdateTransportConfig updates the transport.Config to use credentials
238// returned by the plugin.
239func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
240 // If a bearer token is present in the request - avoid the GetCert callback when
241 // setting up the transport, as that triggers the exec action if the server is
242 // also configured to allow client certificates for authentication. For requests
243 // like "kubectl get --token (token) pods" we should assume the intention is to
244 // use the provided token for authentication.
245 if c.HasTokenAuth() {
246 return nil
247 }
248
249 c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
250 return &roundTripper{a, rt}
251 })
252
253 if c.TLS.GetCert != nil {
254 return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
255 }
256 c.TLS.GetCert = a.cert
257
258 var dial func(ctx context.Context, network, addr string) (net.Conn, error)
259 if c.Dial != nil {
260 dial = c.Dial
261 } else {
262 dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
263 }
264 d := connrotation.NewDialer(dial)
265
266 a.mu.Lock()
267 defer a.mu.Unlock()
268 a.onRotateList = append(a.onRotateList, d.CloseAll)
269 onRotateListLength := len(a.onRotateList)
270 if onRotateListLength > onRotateListWarningLength {
271 klog.Warningf("constructing many client instances from the same exec auth config can cause performance problems during cert rotation and can exhaust available network connections; %d clients constructed calling %q", onRotateListLength, a.cmd)
272 }
273
274 c.Dial = d.DialContext
275
276 return nil
277}
278
279type roundTripper struct {
280 a *Authenticator
281 base http.RoundTripper
282}
283
284func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
285 // If a user has already set credentials, use that. This makes commands like
286 // "kubectl get --token (token) pods" work.
287 if req.Header.Get("Authorization") != "" {
288 return r.base.RoundTrip(req)
289 }
290
291 creds, err := r.a.getCreds()
292 if err != nil {
293 return nil, fmt.Errorf("getting credentials: %v", err)
294 }
295 if creds.token != "" {
296 req.Header.Set("Authorization", "Bearer "+creds.token)
297 }
298
299 res, err := r.base.RoundTrip(req)
300 if err != nil {
301 return nil, err
302 }
303 if res.StatusCode == http.StatusUnauthorized {
304 resp := &clientauthentication.Response{
305 Header: res.Header,
306 Code: int32(res.StatusCode),
307 }
308 if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
309 klog.Errorf("refreshing credentials: %v", err)
310 }
311 }
312 return res, nil
313}
314
315func (a *Authenticator) credsExpired() bool {
316 if a.exp.IsZero() {
317 return false
318 }
319 return a.now().After(a.exp)
320}
321
322func (a *Authenticator) cert() (*tls.Certificate, error) {
323 creds, err := a.getCreds()
324 if err != nil {
325 return nil, err
326 }
327 return creds.cert, nil
328}
329
330func (a *Authenticator) getCreds() (*credentials, error) {
331 a.mu.Lock()
332 defer a.mu.Unlock()
333
334 if a.cachedCreds != nil && !a.credsExpired() {
335 return a.cachedCreds, nil
336 }
337
338 if err := a.refreshCredsLocked(nil); err != nil {
339 return nil, err
340 }
341
342 return a.cachedCreds, nil
343}
344
345// maybeRefreshCreds executes the plugin to force a rotation of the
346// credentials, unless they were rotated already.
347func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
348 a.mu.Lock()
349 defer a.mu.Unlock()
350
351 // Since we're not making a new pointer to a.cachedCreds in getCreds, no
352 // need to do deep comparison.
353 if creds != a.cachedCreds {
354 // Credentials already rotated.
355 return nil
356 }
357
358 return a.refreshCredsLocked(r)
359}
360
361// refreshCredsLocked executes the plugin and reads the credentials from
362// stdout. It must be called while holding the Authenticator's mutex.
363func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
364 cred := &clientauthentication.ExecCredential{
365 Spec: clientauthentication.ExecCredentialSpec{
366 Response: r,
367 Interactive: a.interactive,
368 },
369 }
370
371 env := append(a.environ(), a.env...)
372 if a.group == v1alpha1.SchemeGroupVersion {
373 // Input spec disabled for beta due to lack of use. Possibly re-enable this later if
374 // someone wants it back.
375 //
376 // See: https://github.com/kubernetes/kubernetes/issues/61796
377 data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
378 if err != nil {
379 return fmt.Errorf("encode ExecCredentials: %v", err)
380 }
381 env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
382 }
383
384 stdout := &bytes.Buffer{}
385 cmd := exec.Command(a.cmd, a.args...)
386 cmd.Env = env
387 cmd.Stderr = a.stderr
388 cmd.Stdout = stdout
389 if a.interactive {
390 cmd.Stdin = a.stdin
391 }
392
393 if err := cmd.Run(); err != nil {
394 return a.wrapCmdRunErrorLocked(err)
395 }
396
397 _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
398 if err != nil {
399 return fmt.Errorf("decoding stdout: %v", err)
400 }
401 if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
402 return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
403 a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
404 }
405
406 if cred.Status == nil {
407 return fmt.Errorf("exec plugin didn't return a status field")
408 }
409 if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
410 return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
411 }
412 if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
413 return fmt.Errorf("exec plugin returned only certificate or key, not both")
414 }
415
416 if cred.Status.ExpirationTimestamp != nil {
417 a.exp = cred.Status.ExpirationTimestamp.Time
418 } else {
419 a.exp = time.Time{}
420 }
421
422 newCreds := &credentials{
423 token: cred.Status.Token,
424 }
425 if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
426 cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
427 if err != nil {
428 return fmt.Errorf("failed parsing client key/certificate: %v", err)
429 }
430
431 // Leaf is initialized to be nil:
432 // https://golang.org/pkg/crypto/tls/#X509KeyPair
433 // Leaf certificate is the first certificate:
434 // https://golang.org/pkg/crypto/tls/#Certificate
435 // Populating leaf is useful for quickly accessing the underlying x509
436 // certificate values.
437 cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
438 if err != nil {
439 return fmt.Errorf("failed parsing client leaf certificate: %v", err)
440 }
441 newCreds.cert = &cert
442 }
443
444 oldCreds := a.cachedCreds
445 a.cachedCreds = newCreds
446 // Only close all connections when TLS cert rotates. Token rotation doesn't
447 // need the extra noise.
448 if oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
449 // Can be nil if the exec auth plugin only returned token auth.
450 if oldCreds.cert != nil && oldCreds.cert.Leaf != nil {
451 metrics.ClientCertRotationAge.Observe(time.Now().Sub(oldCreds.cert.Leaf.NotBefore))
452 }
453 for _, onRotate := range a.onRotateList {
454 onRotate()
455 }
456 }
457
458 expiry := time.Time{}
459 if a.cachedCreds.cert != nil && a.cachedCreds.cert.Leaf != nil {
460 expiry = a.cachedCreds.cert.Leaf.NotAfter
461 }
462 expirationMetrics.set(a, expiry)
463 return nil
464}
465
466// wrapCmdRunErrorLocked pulls out the code to construct a helpful error message
467// for when the exec plugin's binary fails to Run().
468//
469// It must be called while holding the Authenticator's mutex.
470func (a *Authenticator) wrapCmdRunErrorLocked(err error) error {
471 switch err.(type) {
472 case *exec.Error: // Binary does not exist (see exec.Error).
473 builder := strings.Builder{}
474 fmt.Fprintf(&builder, "exec: executable %s not found", a.cmd)
475
476 a.sometimes.Do(func() {
477 fmt.Fprint(&builder, installHintVerboseHelp)
478 if a.installHint != "" {
479 fmt.Fprintf(&builder, "\n\n%s", a.installHint)
480 }
481 })
482
483 return errors.New(builder.String())
484
485 case *exec.ExitError: // Binary execution failed (see exec.Cmd.Run()).
486 e := err.(*exec.ExitError)
487 return fmt.Errorf(
488 "exec: executable %s failed with exit code %d",
489 a.cmd,
490 e.ProcessState.ExitCode(),
491 )
492
493 default:
494 return fmt.Errorf("exec: %v", err)
495 }
496}