blob: 57f3ecbb2d075e664e9dbc3431f0704ebeaf6adb [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "encoding/asn1"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "strings"
9 "time"
10
11 "gopkg.in/jcmturner/gokrb5.v7/asn1tools"
12 "gopkg.in/jcmturner/gokrb5.v7/gssapi"
13 "gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype"
14 "gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
15 "gopkg.in/jcmturner/gokrb5.v7/messages"
16 "gopkg.in/jcmturner/gokrb5.v7/types"
17)
18
19const (
20 TOK_ID_KRB_AP_REQ = 256
21 GSS_API_GENERIC_TAG = 0x60
22 KRB5_USER_AUTH = 1
23 KRB5_KEYTAB_AUTH = 2
24 GSS_API_INITIAL = 1
25 GSS_API_VERIFY = 2
26 GSS_API_FINISH = 3
27)
28
29type GSSAPIConfig struct {
30 AuthType int
31 KeyTabPath string
32 KerberosConfigPath string
33 ServiceName string
34 Username string
35 Password string
36 Realm string
37}
38
39type GSSAPIKerberosAuth struct {
40 Config *GSSAPIConfig
41 ticket messages.Ticket
42 encKey types.EncryptionKey
43 NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
44 step int
45}
46
47type KerberosClient interface {
48 Login() error
49 GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
50 Domain() string
51 CName() types.PrincipalName
52 Destroy()
53}
54
55/*
56*
57* Appends length in big endian before payload, and send it to kafka
58*
59 */
60
61func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
62 length := len(payload)
63 finalPackage := make([]byte, length+4) //4 byte length header + payload
64 copy(finalPackage[4:], payload)
65 binary.BigEndian.PutUint32(finalPackage, uint32(length))
66 bytes, err := broker.conn.Write(finalPackage)
67 if err != nil {
68 return bytes, err
69 }
70 return bytes, nil
71}
72
73/*
74*
75* Read length (4 bytes) and then read the payload
76*
77 */
78
79func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
80 bytesRead := 0
81 lengthInBytes := make([]byte, 4)
82 bytes, err := io.ReadFull(broker.conn, lengthInBytes)
83 if err != nil {
84 return nil, bytesRead, err
85 }
86 bytesRead += bytes
87 payloadLength := binary.BigEndian.Uint32(lengthInBytes)
88 payloadBytes := make([]byte, payloadLength) // buffer for read..
89 bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
90 if err != nil {
91 return payloadBytes, bytesRead, err
92 }
93 bytesRead += bytes
94 return payloadBytes, bytesRead, nil
95}
96
97func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
98 a := make([]byte, 24)
99 flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
100 binary.LittleEndian.PutUint32(a[:4], 16)
101 for _, i := range flags {
102 f := binary.LittleEndian.Uint32(a[20:24])
103 f |= uint32(i)
104 binary.LittleEndian.PutUint32(a[20:24], f)
105 }
106 return a
107}
108
109/*
110*
111* Construct Kerberos AP_REQ package, conforming to RFC-4120
112* https://tools.ietf.org/html/rfc4120#page-84
113*
114 */
115func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
116 domain string, cname types.PrincipalName,
117 ticket messages.Ticket,
118 sessionKey types.EncryptionKey) ([]byte, error) {
119 auth, err := types.NewAuthenticator(domain, cname)
120 if err != nil {
121 return nil, err
122 }
123 auth.Cksum = types.Checksum{
124 CksumType: chksumtype.GSSAPI,
125 Checksum: krbAuth.newAuthenticatorChecksum(),
126 }
127 APReq, err := messages.NewAPReq(
128 ticket,
129 sessionKey,
130 auth,
131 )
132 if err != nil {
133 return nil, err
134 }
135 aprBytes := make([]byte, 2)
136 binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
137 tb, err := APReq.Marshal()
138 if err != nil {
139 return nil, err
140 }
141 aprBytes = append(aprBytes, tb...)
142 return aprBytes, nil
143}
144
145/*
146*
147* Append the GSS-API header to the payload, conforming to RFC-2743
148* Section 3.1, Mechanism-Independent Token Format
149*
150* https://tools.ietf.org/html/rfc2743#page-81
151*
152* GSSAPIHeader + <specific mechanism payload>
153*
154 */
155func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
156 oidBytes, err := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5))
157 if err != nil {
158 return nil, err
159 }
160 tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
161 GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
162 GSSHeader = append(GSSHeader, oidBytes...)
163 GSSPackage := append(GSSHeader, payload...)
164 return GSSPackage, nil
165}
166
167func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
168 switch krbAuth.step {
169 case GSS_API_INITIAL:
170 aprBytes, err := krbAuth.createKrb5Token(
171 kerberosClient.Domain(),
172 kerberosClient.CName(),
173 krbAuth.ticket,
174 krbAuth.encKey)
175 if err != nil {
176 return nil, err
177 }
178 krbAuth.step = GSS_API_VERIFY
179 return krbAuth.appendGSSAPIHeader(aprBytes)
180 case GSS_API_VERIFY:
181 wrapTokenReq := gssapi.WrapToken{}
182 if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
183 return nil, err
184 }
185 // Validate response.
186 isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
187 if !isValid {
188 return nil, err
189 }
190
191 wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
192 if err != nil {
193 return nil, err
194 }
195 krbAuth.step = GSS_API_FINISH
196 return wrapTokenResponse.Marshal()
197 }
198 return nil, nil
199}
200
201/* This does the handshake for authorization */
202func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
203
204 kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
205 if err != nil {
206 Logger.Printf("Kerberos client error: %s", err)
207 return err
208 }
209
210 err = kerberosClient.Login()
211 if err != nil {
212 Logger.Printf("Kerberos client error: %s", err)
213 return err
214 }
215 // Construct SPN using serviceName and host
216 // SPN format: <SERVICE>/<FQDN>
217
218 host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
219 spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
220
221 ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
222
223 if err != nil {
224 Logger.Printf("Error getting Kerberos service ticket : %s", err)
225 return err
226 }
227 krbAuth.ticket = ticket
228 krbAuth.encKey = encKey
229 krbAuth.step = GSS_API_INITIAL
230 var receivedBytes []byte = nil
231 defer kerberosClient.Destroy()
232 for {
233 packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
234 if err != nil {
235 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
236 return err
237 }
238 requestTime := time.Now()
239 bytesWritten, err := krbAuth.writePackage(broker, packBytes)
240 if err != nil {
241 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
242 return err
243 }
244 broker.updateOutgoingCommunicationMetrics(bytesWritten)
245 if krbAuth.step == GSS_API_VERIFY {
246 var bytesRead = 0
247 receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
248 requestLatency := time.Since(requestTime)
249 broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
250 if err != nil {
251 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
252 return err
253 }
254 } else if krbAuth.step == GSS_API_FINISH {
255 return nil
256 }
257 }
258}