blob: 49b632d94aa9aeb5c49a2e03711097bbf9489837 [file] [log] [blame]
Scott Baker8461e152019-10-01 14:44:30 -07001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "github.com/jcmturner/gofork/encoding/asn1"
7 "gopkg.in/jcmturner/gokrb5.v7/asn1tools"
8 "gopkg.in/jcmturner/gokrb5.v7/gssapi"
9 "gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype"
10 "gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
11 "gopkg.in/jcmturner/gokrb5.v7/messages"
12 "gopkg.in/jcmturner/gokrb5.v7/types"
13 "io"
14 "strings"
15 "time"
16)
17
18const (
19 TOK_ID_KRB_AP_REQ = 256
20 GSS_API_GENERIC_TAG = 0x60
21 KRB5_USER_AUTH = 1
22 KRB5_KEYTAB_AUTH = 2
23 GSS_API_INITIAL = 1
24 GSS_API_VERIFY = 2
25 GSS_API_FINISH = 3
26)
27
28type GSSAPIConfig struct {
29 AuthType int
30 KeyTabPath string
31 KerberosConfigPath string
32 ServiceName string
33 Username string
34 Password string
35 Realm string
36}
37
38type GSSAPIKerberosAuth struct {
39 Config *GSSAPIConfig
40 ticket messages.Ticket
41 encKey types.EncryptionKey
42 NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
43 step int
44}
45
46type KerberosClient interface {
47 Login() error
48 GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
49 Domain() string
50 CName() types.PrincipalName
51 Destroy()
52}
53
54/*
55*
56* Appends length in big endian before payload, and send it to kafka
57*
58 */
59
60func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
61 length := len(payload)
62 finalPackage := make([]byte, length+4) //4 byte length header + payload
63 copy(finalPackage[4:], payload)
64 binary.BigEndian.PutUint32(finalPackage, uint32(length))
65 bytes, err := broker.conn.Write(finalPackage)
66 if err != nil {
67 return bytes, err
68 }
69 return bytes, nil
70}
71
72/*
73*
74* Read length (4 bytes) and then read the payload
75*
76 */
77
78func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
79 bytesRead := 0
80 lengthInBytes := make([]byte, 4)
81 bytes, err := io.ReadFull(broker.conn, lengthInBytes)
82 if err != nil {
83 return nil, bytesRead, err
84 }
85 bytesRead += bytes
86 payloadLength := binary.BigEndian.Uint32(lengthInBytes)
87 payloadBytes := make([]byte, payloadLength) // buffer for read..
88 bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
89 if err != nil {
90 return payloadBytes, bytesRead, err
91 }
92 bytesRead += bytes
93 return payloadBytes, bytesRead, nil
94}
95
96func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
97 a := make([]byte, 24)
98 flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
99 binary.LittleEndian.PutUint32(a[:4], 16)
100 for _, i := range flags {
101 f := binary.LittleEndian.Uint32(a[20:24])
102 f |= uint32(i)
103 binary.LittleEndian.PutUint32(a[20:24], f)
104 }
105 return a
106}
107
108/*
109*
110* Construct Kerberos AP_REQ package, conforming to RFC-4120
111* https://tools.ietf.org/html/rfc4120#page-84
112*
113 */
114func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
115 domain string, cname types.PrincipalName,
116 ticket messages.Ticket,
117 sessionKey types.EncryptionKey) ([]byte, error) {
118 auth, err := types.NewAuthenticator(domain, cname)
119 if err != nil {
120 return nil, err
121 }
122 auth.Cksum = types.Checksum{
123 CksumType: chksumtype.GSSAPI,
124 Checksum: krbAuth.newAuthenticatorChecksum(),
125 }
126 APReq, err := messages.NewAPReq(
127 ticket,
128 sessionKey,
129 auth,
130 )
131 if err != nil {
132 return nil, err
133 }
134 aprBytes := make([]byte, 2)
135 binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
136 tb, err := APReq.Marshal()
137 if err != nil {
138 return nil, err
139 }
140 aprBytes = append(aprBytes, tb...)
141 return aprBytes, nil
142}
143
144/*
145*
146* Append the GSS-API header to the payload, conforming to RFC-2743
147* Section 3.1, Mechanism-Independent Token Format
148*
149* https://tools.ietf.org/html/rfc2743#page-81
150*
151* GSSAPIHeader + <specific mechanism payload>
152*
153 */
154func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
155 oidBytes, err := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5))
156 if err != nil {
157 return nil, err
158 }
159 tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
160 GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
161 GSSHeader = append(GSSHeader, oidBytes...)
162 GSSPackage := append(GSSHeader, payload...)
163 return GSSPackage, nil
164}
165
166func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
167 switch krbAuth.step {
168 case GSS_API_INITIAL:
169 aprBytes, err := krbAuth.createKrb5Token(
170 kerberosClient.Domain(),
171 kerberosClient.CName(),
172 krbAuth.ticket,
173 krbAuth.encKey)
174 if err != nil {
175 return nil, err
176 }
177 krbAuth.step = GSS_API_VERIFY
178 return krbAuth.appendGSSAPIHeader(aprBytes)
179 case GSS_API_VERIFY:
180 wrapTokenReq := gssapi.WrapToken{}
181 if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
182 return nil, err
183 }
184 // Validate response.
185 isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
186 if !isValid {
187 return nil, err
188 }
189
190 wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
191 if err != nil {
192 return nil, err
193 }
194 krbAuth.step = GSS_API_FINISH
195 return wrapTokenResponse.Marshal()
196 }
197 return nil, nil
198}
199
200/* This does the handshake for authorization */
201func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
202
203 kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
204 if err != nil {
205 Logger.Printf("Kerberos client error: %s", err)
206 return err
207 }
208
209 err = kerberosClient.Login()
210 if err != nil {
211 Logger.Printf("Kerberos client error: %s", err)
212 return err
213 }
214 // Construct SPN using serviceName and host
215 // SPN format: <SERVICE>/<FQDN>
216
217 host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
218 spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
219
220 ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
221
222 if err != nil {
223 Logger.Printf("Error getting Kerberos service ticket : %s", err)
224 return err
225 }
226 krbAuth.ticket = ticket
227 krbAuth.encKey = encKey
228 krbAuth.step = GSS_API_INITIAL
229 var receivedBytes []byte = nil
230 defer kerberosClient.Destroy()
231 for {
232 packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
233 if err != nil {
234 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
235 return err
236 }
237 requestTime := time.Now()
238 bytesWritten, err := krbAuth.writePackage(broker, packBytes)
239 if err != nil {
240 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
241 return err
242 }
243 broker.updateOutgoingCommunicationMetrics(bytesWritten)
244 if krbAuth.step == GSS_API_VERIFY {
245 var bytesRead = 0
246 receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
247 requestLatency := time.Since(requestTime)
248 broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
249 if err != nil {
250 Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
251 return err
252 }
253 } else if krbAuth.step == GSS_API_FINISH {
254 return nil
255 }
256 }
257}