blob: 4d72526583ef6d0006116758512a8b804be17721 [file] [log] [blame]
sslobodrd046be82019-01-16 10:02:22 -05001/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package exec
18
19import (
20 "bytes"
21 "context"
22 "crypto/tls"
23 "errors"
24 "fmt"
25 "io"
26 "net"
27 "net/http"
28 "os"
29 "os/exec"
30 "reflect"
31 "sync"
32 "time"
33
34 "golang.org/x/crypto/ssh/terminal"
35 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/runtime/schema"
38 "k8s.io/apimachinery/pkg/runtime/serializer"
39 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
40 "k8s.io/client-go/pkg/apis/clientauthentication"
41 "k8s.io/client-go/pkg/apis/clientauthentication/v1alpha1"
42 "k8s.io/client-go/pkg/apis/clientauthentication/v1beta1"
43 "k8s.io/client-go/tools/clientcmd/api"
44 "k8s.io/client-go/transport"
45 "k8s.io/client-go/util/connrotation"
46 "k8s.io/klog"
47)
48
49const execInfoEnv = "KUBERNETES_EXEC_INFO"
50
51var scheme = runtime.NewScheme()
52var codecs = serializer.NewCodecFactory(scheme)
53
54func init() {
55 v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
56 utilruntime.Must(v1alpha1.AddToScheme(scheme))
57 utilruntime.Must(v1beta1.AddToScheme(scheme))
58 utilruntime.Must(clientauthentication.AddToScheme(scheme))
59}
60
61var (
62 // Since transports can be constantly re-initialized by programs like kubectl,
63 // keep a cache of initialized authenticators keyed by a hash of their config.
64 globalCache = newCache()
65 // The list of API versions we accept.
66 apiVersions = map[string]schema.GroupVersion{
67 v1alpha1.SchemeGroupVersion.String(): v1alpha1.SchemeGroupVersion,
68 v1beta1.SchemeGroupVersion.String(): v1beta1.SchemeGroupVersion,
69 }
70)
71
72func newCache() *cache {
73 return &cache{m: make(map[string]*Authenticator)}
74}
75
76func cacheKey(c *api.ExecConfig) string {
77 return fmt.Sprintf("%#v", c)
78}
79
80type cache struct {
81 mu sync.Mutex
82 m map[string]*Authenticator
83}
84
85func (c *cache) get(s string) (*Authenticator, bool) {
86 c.mu.Lock()
87 defer c.mu.Unlock()
88 a, ok := c.m[s]
89 return a, ok
90}
91
92// put inserts an authenticator into the cache. If an authenticator is already
93// associated with the key, the first one is returned instead.
94func (c *cache) put(s string, a *Authenticator) *Authenticator {
95 c.mu.Lock()
96 defer c.mu.Unlock()
97 existing, ok := c.m[s]
98 if ok {
99 return existing
100 }
101 c.m[s] = a
102 return a
103}
104
105// GetAuthenticator returns an exec-based plugin for providing client credentials.
106func GetAuthenticator(config *api.ExecConfig) (*Authenticator, error) {
107 return newAuthenticator(globalCache, config)
108}
109
110func newAuthenticator(c *cache, config *api.ExecConfig) (*Authenticator, error) {
111 key := cacheKey(config)
112 if a, ok := c.get(key); ok {
113 return a, nil
114 }
115
116 gv, ok := apiVersions[config.APIVersion]
117 if !ok {
118 return nil, fmt.Errorf("exec plugin: invalid apiVersion %q", config.APIVersion)
119 }
120
121 a := &Authenticator{
122 cmd: config.Command,
123 args: config.Args,
124 group: gv,
125
126 stdin: os.Stdin,
127 stderr: os.Stderr,
128 interactive: terminal.IsTerminal(int(os.Stdout.Fd())),
129 now: time.Now,
130 environ: os.Environ,
131 }
132
133 for _, env := range config.Env {
134 a.env = append(a.env, env.Name+"="+env.Value)
135 }
136
137 return c.put(key, a), nil
138}
139
140// Authenticator is a client credential provider that rotates credentials by executing a plugin.
141// The plugin input and output are defined by the API group client.authentication.k8s.io.
142type Authenticator struct {
143 // Set by the config
144 cmd string
145 args []string
146 group schema.GroupVersion
147 env []string
148
149 // Stubbable for testing
150 stdin io.Reader
151 stderr io.Writer
152 interactive bool
153 now func() time.Time
154 environ func() []string
155
156 // Cached results.
157 //
158 // The mutex also guards calling the plugin. Since the plugin could be
159 // interactive we want to make sure it's only called once.
160 mu sync.Mutex
161 cachedCreds *credentials
162 exp time.Time
163
164 onRotate func()
165}
166
167type credentials struct {
168 token string
169 cert *tls.Certificate
170}
171
172// UpdateTransportConfig updates the transport.Config to use credentials
173// returned by the plugin.
174func (a *Authenticator) UpdateTransportConfig(c *transport.Config) error {
175 wt := c.WrapTransport
176 c.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
177 if wt != nil {
178 rt = wt(rt)
179 }
180 return &roundTripper{a, rt}
181 }
182
183 if c.TLS.GetCert != nil {
184 return errors.New("can't add TLS certificate callback: transport.Config.TLS.GetCert already set")
185 }
186 c.TLS.GetCert = a.cert
187
188 var dial func(ctx context.Context, network, addr string) (net.Conn, error)
189 if c.Dial != nil {
190 dial = c.Dial
191 } else {
192 dial = (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext
193 }
194 d := connrotation.NewDialer(dial)
195 a.onRotate = d.CloseAll
196 c.Dial = d.DialContext
197
198 return nil
199}
200
201type roundTripper struct {
202 a *Authenticator
203 base http.RoundTripper
204}
205
206func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
207 // If a user has already set credentials, use that. This makes commands like
208 // "kubectl get --token (token) pods" work.
209 if req.Header.Get("Authorization") != "" {
210 return r.base.RoundTrip(req)
211 }
212
213 creds, err := r.a.getCreds()
214 if err != nil {
215 return nil, fmt.Errorf("getting credentials: %v", err)
216 }
217 if creds.token != "" {
218 req.Header.Set("Authorization", "Bearer "+creds.token)
219 }
220
221 res, err := r.base.RoundTrip(req)
222 if err != nil {
223 return nil, err
224 }
225 if res.StatusCode == http.StatusUnauthorized {
226 resp := &clientauthentication.Response{
227 Header: res.Header,
228 Code: int32(res.StatusCode),
229 }
230 if err := r.a.maybeRefreshCreds(creds, resp); err != nil {
231 klog.Errorf("refreshing credentials: %v", err)
232 }
233 }
234 return res, nil
235}
236
237func (a *Authenticator) credsExpired() bool {
238 if a.exp.IsZero() {
239 return false
240 }
241 return a.now().After(a.exp)
242}
243
244func (a *Authenticator) cert() (*tls.Certificate, error) {
245 creds, err := a.getCreds()
246 if err != nil {
247 return nil, err
248 }
249 return creds.cert, nil
250}
251
252func (a *Authenticator) getCreds() (*credentials, error) {
253 a.mu.Lock()
254 defer a.mu.Unlock()
255 if a.cachedCreds != nil && !a.credsExpired() {
256 return a.cachedCreds, nil
257 }
258
259 if err := a.refreshCredsLocked(nil); err != nil {
260 return nil, err
261 }
262 return a.cachedCreds, nil
263}
264
265// maybeRefreshCreds executes the plugin to force a rotation of the
266// credentials, unless they were rotated already.
267func (a *Authenticator) maybeRefreshCreds(creds *credentials, r *clientauthentication.Response) error {
268 a.mu.Lock()
269 defer a.mu.Unlock()
270
271 // Since we're not making a new pointer to a.cachedCreds in getCreds, no
272 // need to do deep comparison.
273 if creds != a.cachedCreds {
274 // Credentials already rotated.
275 return nil
276 }
277
278 return a.refreshCredsLocked(r)
279}
280
281// refreshCredsLocked executes the plugin and reads the credentials from
282// stdout. It must be called while holding the Authenticator's mutex.
283func (a *Authenticator) refreshCredsLocked(r *clientauthentication.Response) error {
284 cred := &clientauthentication.ExecCredential{
285 Spec: clientauthentication.ExecCredentialSpec{
286 Response: r,
287 Interactive: a.interactive,
288 },
289 }
290
291 env := append(a.environ(), a.env...)
292 if a.group == v1alpha1.SchemeGroupVersion {
293 // Input spec disabled for beta due to lack of use. Possibly re-enable this later if
294 // someone wants it back.
295 //
296 // See: https://github.com/kubernetes/kubernetes/issues/61796
297 data, err := runtime.Encode(codecs.LegacyCodec(a.group), cred)
298 if err != nil {
299 return fmt.Errorf("encode ExecCredentials: %v", err)
300 }
301 env = append(env, fmt.Sprintf("%s=%s", execInfoEnv, data))
302 }
303
304 stdout := &bytes.Buffer{}
305 cmd := exec.Command(a.cmd, a.args...)
306 cmd.Env = env
307 cmd.Stderr = a.stderr
308 cmd.Stdout = stdout
309 if a.interactive {
310 cmd.Stdin = a.stdin
311 }
312
313 if err := cmd.Run(); err != nil {
314 return fmt.Errorf("exec: %v", err)
315 }
316
317 _, gvk, err := codecs.UniversalDecoder(a.group).Decode(stdout.Bytes(), nil, cred)
318 if err != nil {
319 return fmt.Errorf("decoding stdout: %v", err)
320 }
321 if gvk.Group != a.group.Group || gvk.Version != a.group.Version {
322 return fmt.Errorf("exec plugin is configured to use API version %s, plugin returned version %s",
323 a.group, schema.GroupVersion{Group: gvk.Group, Version: gvk.Version})
324 }
325
326 if cred.Status == nil {
327 return fmt.Errorf("exec plugin didn't return a status field")
328 }
329 if cred.Status.Token == "" && cred.Status.ClientCertificateData == "" && cred.Status.ClientKeyData == "" {
330 return fmt.Errorf("exec plugin didn't return a token or cert/key pair")
331 }
332 if (cred.Status.ClientCertificateData == "") != (cred.Status.ClientKeyData == "") {
333 return fmt.Errorf("exec plugin returned only certificate or key, not both")
334 }
335
336 if cred.Status.ExpirationTimestamp != nil {
337 a.exp = cred.Status.ExpirationTimestamp.Time
338 } else {
339 a.exp = time.Time{}
340 }
341
342 newCreds := &credentials{
343 token: cred.Status.Token,
344 }
345 if cred.Status.ClientKeyData != "" && cred.Status.ClientCertificateData != "" {
346 cert, err := tls.X509KeyPair([]byte(cred.Status.ClientCertificateData), []byte(cred.Status.ClientKeyData))
347 if err != nil {
348 return fmt.Errorf("failed parsing client key/certificate: %v", err)
349 }
350 newCreds.cert = &cert
351 }
352
353 oldCreds := a.cachedCreds
354 a.cachedCreds = newCreds
355 // Only close all connections when TLS cert rotates. Token rotation doesn't
356 // need the extra noise.
357 if a.onRotate != nil && oldCreds != nil && !reflect.DeepEqual(oldCreds.cert, a.cachedCreds.cert) {
358 a.onRotate()
359 }
360 return nil
361}