blob: b88902c10314c3b0266e9f362f3453173253e852 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package exec
18
19import (
20 "bytes"
21 "context"
22 "crypto/tls"
23 "errors"
24 "fmt"
25 "io"
26 "net"
27 "net/http"
28 "os"
29 "os/exec"
30 "reflect"
31 "sync"
32 "time"
33
34 "github.com/davecgh/go-spew/spew"
35 "golang.org/x/crypto/ssh/terminal"
36 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/runtime"
38 "k8s.io/apimachinery/pkg/runtime/schema"
39 "k8s.io/apimachinery/pkg/runtime/serializer"
40 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
41 "k8s.io/client-go/pkg/apis/clientauthentication"
42 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
43 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
44 "k8s.io/client-go/tools/clientcmd/api"
45 "k8s.io/client-go/transport"
46 "k8s.io/client-go/util/connrotation"
47 "k8s.io/klog"
48)
49
50const execInfoEnv = "KUBERNETES_EXEC_INFO"
51
52var scheme = runtime.NewScheme()
53var codecs = serializer.NewCodecFactory(scheme)
54
55func init() {
56 v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
57 utilruntime.Must(v1alpha1.AddToScheme(scheme))
58 utilruntime.Must(v1beta1.AddToScheme(scheme))
59 utilruntime.Must(clientauthentication.AddToScheme(scheme))
60}
61
62var (
63 // Since transports can be constantly re-initialized by programs like kubectl,
64 // keep a cache of initialized authenticators keyed by a hash of their config.
65 globalCache = newCache()
66 // The list of API versions we accept.
67 apiVersions = map[string]schema.GroupVersion{
68 v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
69 v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion,
70 }
71)
72
73func newCache() *cache {
74 return &cache{m: make(map[string]*Authenticator)}
75}
76
77var spewConfig = &spew.ConfigState{DisableMethods: true, Indent: " "}
78
79func cacheKey(c *api.ExecConfig) string {
80 return spewConfig.Sprint(c)
81}
82
83type cache struct {
84 mu sync.Mutex
85 m map[string]*Authenticator
86}
87
88func (c *cache) get(s string) (*Authenticator, bool) {
89 c.mu.Lock()
90 defer c.mu.Unlock()
91 a, ok := c.m[s]
92 return a, ok
93}
94
95// put inserts an authenticator into the cache. If an authenticator is already
96// associated with the key, the first one is returned instead.
97func (c *cache) put(s string, a *Authenticator) *Authenticator {
98 c.mu.Lock()
99 defer c.mu.Unlock()
100 existing, ok := c.m[s]
101 if ok {
102 return existing
103 }
104 c.m[s] = a
105 return a
106}
107
108// GetAuthenticator returns an exec-based plugin for providing client credentials.
109func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
110 return newAuthenticator(globalCache, config)
111}
112
113func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
114 key := cacheKey(config)
115 if a, ok := c.get(key); ok {
116 return a, nil
117 }
118
119 gv, ok := apiVersions[config.APIVersion]
120 if !ok {
121 return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
122 }
123
124 a := &Authenticator{
125 cmd: config.Command,
126 args: config.Args,
127 group: gv,
128
129 stdin: os.Stdin,
130 stderr: os.Stderr,
131 interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
132 now: time.Now,
133 environ: os.Environ,
134 }
135
136 for _, env := range config.Env {
137 a.env = append(a.env, env.Name+"="+env.Value)
138 }
139
140 return c.put(key, a), nil
141}
142
143// Authenticator is a client credential provider that rotates credentials by executing a plugin.
144// The plugin input and output are defined by the API group client.authentication.k8s.io.
145type Authenticator struct {
146 // Set by the config
147 cmd string
148 args []string
149 group schema.GroupVersion
150 env []string
151
152 // Stubbable for testing
153 stdin io.Reader
154 stderr io.Writer
155 interactive bool
156 now func() time.Time
157 environ func() []string
158
159 // Cached results.
160 //
161 // The mutex also guards calling the plugin. Since the plugin could be
162 // interactive we want to make sure it's only called once.
163 mu sync.Mutex
164 cachedCreds *credentials
165 exp time.Time
166
167 onRotate func()
168}
169
170type credentials struct {
171 token string
172 cert *tls.Certificate
173}
174
175// UpdateTransportConfig updates the transport.Config to use credentials
176// returned by the plugin.
177func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
178 c.Wrap(func(rt http.RoundTripper) http.RoundTripper {
179 return &roundTripper{a, rt}
180 })
181
182 if c.TLS.GetCert != nil {
183 return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
184 }
185 c.TLS.GetCert = a.cert
186
187 var dial func(ctx context.Context, network, addr string) (net.Conn, error)
188 if c.Dial != nil {
189 dial = c.Dial
190 } else {
191 dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
192 }
193 d := connrotation.NewDialer(dial)
194 a.onRotate = d.CloseAll
195 c.Dial = d.DialContext
196
197 return nil
198}
199
200type roundTripper struct {
201 a *Authenticator
202 base http.RoundTripper
203}
204
205func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
206 // If a user has already set credentials, use that. This makes commands like
207 // "kubectl get --token (token) pods" work.
208 if req.Header.Get("Authorization") != "" {
209 return r.base.RoundTrip(req)
210 }
211
212 creds, err := r.a.getCreds()
213 if err != nil {
214 return nil, fmt.Errorf("getting credentials: %v", err)
215 }
216 if creds.token != "" {
217 req.Header.Set("Authorization", "Bearer "+creds.token)
218 }
219
220 res, err := r.base.RoundTrip(req)
221 if err != nil {
222 return nil, err
223 }
224 if res.StatusCode == http.StatusUnauthorized {
225 resp := &clientauthentication.Response{
226 Header: res.Header,
227 Code: int32(res.StatusCode),
228 }
229 if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
230 klog.Errorf("refreshing credentials: %v", err)
231 }
232 }
233 return res, nil
234}
235
236func (a *Authenticator) credsExpired() bool {
237 if a.exp.IsZero() {
238 return false
239 }
240 return a.now().After(a.exp)
241}
242
243func (a *Authenticator) cert() (*tls.Certificate, error) {
244 creds, err := a.getCreds()
245 if err != nil {
246 return nil, err
247 }
248 return creds.cert, nil
249}
250
251func (a *Authenticator) getCreds() (*credentials, error) {
252 a.mu.Lock()
253 defer a.mu.Unlock()
254 if a.cachedCreds != nil && !a.credsExpired() {
255 return a.cachedCreds, nil
256 }
257
258 if err := a.refreshCredsLocked(nil); err != nil {
259 return nil, err
260 }
261 return a.cachedCreds, nil
262}
263
264// maybeRefreshCreds executes the plugin to force a rotation of the
265// credentials, unless they were rotated already.
266func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
267 a.mu.Lock()
268 defer a.mu.Unlock()
269
270 // Since we're not making a new pointer to a.cachedCreds in getCreds, no
271 // need to do deep comparison.
272 if creds != a.cachedCreds {
273 // Credentials already rotated.
274 return nil
275 }
276
277 return a.refreshCredsLocked(r)
278}
279
280// refreshCredsLocked executes the plugin and reads the credentials from
281// stdout. It must be called while holding the Authenticator's mutex.
282func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
283 cred := &clientauthentication.ExecCredential{
284 Spec: clientauthentication.ExecCredentialSpec{
285 Response: r,
286 Interactive: a.interactive,
287 },
288 }
289
290 env := append(a.environ(), a.env...)
291 if a.group == v1alpha1.SchemeGroupVersion {
292 // Input spec disabled for beta due to lack of use. Possibly re-enable this later if
293 // someone wants it back.
294 //
295 // See: https://github.com/kubernetes/kubernetes/issues/61796
296 data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
297 if err != nil {
298 return fmt.Errorf("encode ExecCredentials: %v", err)
299 }
300 env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
301 }
302
303 stdout := &bytes.Buffer{}
304 cmd := exec.Command(a.cmd, a.args...)
305 cmd.Env = env
306 cmd.Stderr = a.stderr
307 cmd.Stdout = stdout
308 if a.interactive {
309 cmd.Stdin = a.stdin
310 }
311
312 if err := cmd.Run(); err != nil {
313 return fmt.Errorf("exec: %v", err)
314 }
315
316 _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
317 if err != nil {
318 return fmt.Errorf("decoding stdout: %v", err)
319 }
320 if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
321 return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
322 a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
323 }
324
325 if cred.Status == nil {
326 return fmt.Errorf("exec plugin didn't return a status field")
327 }
328 if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
329 return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
330 }
331 if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
332 return fmt.Errorf("exec plugin returned only certificate or key, not both")
333 }
334
335 if cred.Status.ExpirationTimestamp != nil {
336 a.exp = cred.Status.ExpirationTimestamp.Time
337 } else {
338 a.exp = time.Time{}
339 }
340
341 newCreds := &credentials{
342 token: cred.Status.Token,
343 }
344 if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
345 cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
346 if err != nil {
347 return fmt.Errorf("failed parsing client key/certificate: %v", err)
348 }
349 newCreds.cert = &cert
350 }
351
352 oldCreds := a.cachedCreds
353 a.cachedCreds = newCreds
354 // Only close all connections when TLS cert rotates. Token rotation doesn't
355 // need the extra noise.
356 if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
357 a.onRotate()
358 }
359 return nil
360}