blob: dc22b6ec4cc23fda69abb16ea4fdf6186b39a898 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2020 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package transport
18
19import (
20 "bytes"
21 "crypto/tls"
22 "fmt"
23 "reflect"
24 "sync"
25 "time"
26
27 utilnet "k8s.io/apimachinery/pkg/util/net"
28 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/client-go/util/connrotation"
31 "k8s.io/client-go/util/workqueue"
32 "k8s.io/klog/v2"
33)
34
35const workItemKey = "key"
36
37// CertCallbackRefreshDuration is exposed so that integration tests can crank up the reload speed.
38var CertCallbackRefreshDuration = 5 * time.Minute
39
40type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
41
42type dynamicClientCert struct {
43 clientCert *tls.Certificate
44 certMtx sync.RWMutex
45
46 reload reloadFunc
47 connDialer *connrotation.Dialer
48
49 // queue only ever has one item, but it has nice error handling backoff/retry semantics
50 queue workqueue.RateLimitingInterface
51}
52
53func certRotatingDialer(reload reloadFunc, dial utilnet.DialFunc) *dynamicClientCert {
54 d := &dynamicClientCert{
55 reload: reload,
56 connDialer: connrotation.NewDialer(connrotation.DialFunc(dial)),
57 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DynamicClientCertificate"),
58 }
59
60 return d
61}
62
63// loadClientCert calls the callback and rotates connections if needed
64func (c *dynamicClientCert) loadClientCert() (*tls.Certificate, error) {
65 cert, err := c.reload(nil)
66 if err != nil {
67 return nil, err
68 }
69
70 // check to see if we have a change. If the values are the same, do nothing.
71 c.certMtx.RLock()
72 haveCert := c.clientCert != nil
73 if certsEqual(c.clientCert, cert) {
74 c.certMtx.RUnlock()
75 return c.clientCert, nil
76 }
77 c.certMtx.RUnlock()
78
79 c.certMtx.Lock()
80 c.clientCert = cert
81 c.certMtx.Unlock()
82
83 // The first certificate requested is not a rotation that is worth closing connections for
84 if !haveCert {
85 return cert, nil
86 }
87
88 klog.V(1).Infof("certificate rotation detected, shutting down client connections to start using new credentials")
89 c.connDialer.CloseAll()
90
91 return cert, nil
92}
93
94// certsEqual compares tls Certificates, ignoring the Leaf which may get filled in dynamically
95func certsEqual(left, right *tls.Certificate) bool {
96 if left == nil || right == nil {
97 return left == right
98 }
99
100 if !byteMatrixEqual(left.Certificate, right.Certificate) {
101 return false
102 }
103
104 if !reflect.DeepEqual(left.PrivateKey, right.PrivateKey) {
105 return false
106 }
107
108 if !byteMatrixEqual(left.SignedCertificateTimestamps, right.SignedCertificateTimestamps) {
109 return false
110 }
111
112 if !bytes.Equal(left.OCSPStaple, right.OCSPStaple) {
113 return false
114 }
115
116 return true
117}
118
119func byteMatrixEqual(left, right [][]byte) bool {
120 if len(left) != len(right) {
121 return false
122 }
123
124 for i := range left {
125 if !bytes.Equal(left[i], right[i]) {
126 return false
127 }
128 }
129 return true
130}
131
132// run starts the controller and blocks until stopCh is closed.
133func (c *dynamicClientCert) Run(stopCh <-chan struct{}) {
134 defer utilruntime.HandleCrash()
135 defer c.queue.ShutDown()
136
137 klog.V(3).Infof("Starting client certificate rotation controller")
138 defer klog.V(3).Infof("Shutting down client certificate rotation controller")
139
140 go wait.Until(c.runWorker, time.Second, stopCh)
141
142 go wait.PollImmediateUntil(CertCallbackRefreshDuration, func() (bool, error) {
143 c.queue.Add(workItemKey)
144 return false, nil
145 }, stopCh)
146
147 <-stopCh
148}
149
150func (c *dynamicClientCert) runWorker() {
151 for c.processNextWorkItem() {
152 }
153}
154
155func (c *dynamicClientCert) processNextWorkItem() bool {
156 dsKey, quit := c.queue.Get()
157 if quit {
158 return false
159 }
160 defer c.queue.Done(dsKey)
161
162 _, err := c.loadClientCert()
163 if err == nil {
164 c.queue.Forget(dsKey)
165 return true
166 }
167
168 utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
169 c.queue.AddRateLimited(dsKey)
170
171 return true
172}
173
174func (c *dynamicClientCert) GetClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
175 return c.loadClientCert()
176}